Vault  4.1
vlistenerthread.cpp
Go to the documentation of this file.
00001 /*
00002 Copyright c1997-2014 Trygve Isaacson. All rights reserved.
00003 This file is part of the Code Vault version 4.1
00004 http://www.bombaydigital.com/
00005 License: MIT. See LICENSE.md in the Vault top level directory.
00006 */
00007 
00010 #include "vlistenerthread.h"
00011 #include "vtypes_internal.h"
00012 
00013 #include "vlistenersocket.h"
00014 #include "vsocketfactory.h"
00015 #include "vsocketthreadfactory.h"
00016 #include "vmanagementinterface.h"
00017 #include "vclientsession.h"
00018 #include "vexception.h"
00019 #include "vmutexlocker.h"
00020 #include "vlogger.h"
00021 #include "vmessageinputthread.h"
00022 #include "vmessageoutputthread.h"
00023 
00024 VListenerThread::VListenerThread(const VString& threadBaseName, bool deleteSelfAtEnd, bool createDetached, VManagementInterface* manager, int portNumber, const VString& bindAddress, VSocketFactory* socketFactory, VSocketThreadFactory* threadFactory, VClientSessionFactory* sessionFactory, bool initiallyListening)
00025     : VThread(threadBaseName, VSTRING_FORMAT("vault.messages.VListenerThread.%s.%d", threadBaseName.chars(), portNumber), deleteSelfAtEnd, createDetached, manager)
00026     , mPortNumber(portNumber)
00027     , mBindAddress(bindAddress)
00028     , mShouldListen(initiallyListening)
00029     , mSocketFactory(socketFactory)
00030     , mThreadFactory(threadFactory)
00031     , mSessionFactory(sessionFactory)
00032     , mSocketThreads()
00033     , mSocketThreadsMutex(VSTRING_FORMAT("VListenerThread(%s)::mSocketThreadsMutex", threadBaseName.chars()))
00034     {
00035 }
00036 
00037 VListenerThread::~VListenerThread() {
00038     VLOGGER_NAMED_DEBUG(mLoggerName, VSTRING_FORMAT("VListenerThread '%s' ended.", mName.chars()));
00039 
00040     // Make sure any of socket threads still alive no longer reference us.
00041     VMutexLocker locker(&mSocketThreadsMutex, VSTRING_FORMAT("[%s]VListenerThread::socketThreadEnded()", this->getName().chars()));
00042     for (VSocketThreadPtrVector::const_iterator i = mSocketThreads.begin(); i != mSocketThreads.end(); ++i) {
00043         (*i)->mOwnerThread = NULL;
00044     }
00045 }
00046 
00047 void VListenerThread::stop() {
00048     this->stopListening();
00049     this->stopAllSocketThreads();
00050 
00051     VThread::stop();
00052 }
00053 
00054 void VListenerThread::run() {
00055     while (this->isRunning()) {
00056         if (mShouldListen) {
00057             this->_runListening();
00058         } else {
00059             VThread::sleep(5 * VDuration::SECOND()); // this value limits how quickly we can be shut down
00060         }
00061     }
00062 }
00063 
00064 void VListenerThread::socketThreadEnded(VSocketThread* socketThread) {
00065     VMutexLocker                        locker(&mSocketThreadsMutex, VSTRING_FORMAT("[%s]VListenerThread::socketThreadEnded()", this->getName().chars()));
00066     VSocketThreadPtrVector::iterator    position;
00067 
00068     position = std::find(mSocketThreads.begin(), mSocketThreads.end(), socketThread);
00069 
00070     if (position != mSocketThreads.end()) {
00071         mSocketThreads.erase(position);
00072     }
00073 }
00074 
00075 int VListenerThread::getPortNumber() const {
00076     return mPortNumber;
00077 }
00078 
00079 VSocketInfoVector VListenerThread::enumerateActiveSockets() {
00080     VSocketInfoVector   info;
00081     VMutexLocker        locker(&mSocketThreadsMutex, VSTRING_FORMAT("[%s]VListenerThread::enumerateActiveSockets()", this->getName().chars()));
00082 
00083     for (VSizeType i = 0; i < mSocketThreads.size(); ++i) {
00084         VSocketInfo oneSocketInfo(*(mSocketThreads[i]->getSocket()));
00085 
00086         info.push_back(oneSocketInfo);
00087     }
00088 
00089     return info;
00090 }
00091 
00092 void VListenerThread::stopSocketThread(VSocketID socketID, int localPortNumber) {
00093     bool            found = false;
00094     VMutexLocker    locker(&mSocketThreadsMutex, VSTRING_FORMAT("[%s]VListenerThread::stopSocketThread()", this->getName().chars()));
00095 
00096     for (VSizeType i = 0; i < mSocketThreads.size(); ++i) {
00097         VSocketThread*  thread = mSocketThreads[i];
00098         VSocket*        socket = thread->getSocket();
00099 
00100         if ((socket->getSockID() == socketID) &&
00101                 (socket->getPortNumber() == localPortNumber)) {
00102             found = true;
00103             thread->closeAndStop();
00104         }
00105     }
00106 
00107     if (! found) {
00108         throw VStackTraceException(VSTRING_FORMAT("VListenerThread::stopSocketThread did not find a socket with id %d and port %d.", socketID, localPortNumber));
00109     }
00110 }
00111 
00112 void VListenerThread::stopAllSocketThreads() {
00113     VMutexLocker locker(&mSocketThreadsMutex, VSTRING_FORMAT("[%s]VListenerThread::stopAllSocketThreads()", this->getName().chars()));
00114 
00115     for (VSizeType i = 0; i < mSocketThreads.size(); ++i) {
00116         VSocketThread* thread = mSocketThreads[i];
00117 
00118         thread->closeAndStop();
00119     }
00120 }
00121 
00122 void VListenerThread::_runListening() {
00123     VListenerSocket* listenerSocket = NULL;
00124 
00125     if (mManager != NULL) {
00126         mManager->listenerStarting(this);
00127     }
00128 
00129     VString exceptionMessage; // filled in if catch block entered
00130     try {
00131         listenerSocket = new VListenerSocket(mPortNumber, mBindAddress, mSocketFactory);
00132         listenerSocket->listen();
00133 
00134         if (mManager != NULL) {
00135             mManager->listenerListening(this);
00136         }
00137 
00138         while (mShouldListen && this->isRunning()) {
00139             VSocket* theSocket = listenerSocket->accept();
00140 
00141             if (theSocket != NULL) {
00142                 try {
00143                     VMutexLocker locker(&mSocketThreadsMutex, VSTRING_FORMAT("[%s]VListenerThread::_runListening()", this->getName().chars()));
00144 
00145                     if (mSessionFactory == NULL) {
00146                         VSocketThread* thread = mThreadFactory->createThread(theSocket, this);
00147                         thread->start(); // throws if can't create OS thread
00148                         mSocketThreads.push_back(thread);
00149                     } else {
00150                         VClientSessionPtr session = mSessionFactory->createSession(theSocket, this); // throws if can't create OS thread(s)
00151                         VSocketThread* thread;
00152                         thread = session->getInputThread();
00153                         if (thread != NULL) {
00154                             mSocketThreads.push_back(thread);
00155                         }
00156 
00157                         thread = session->getOutputThread();
00158                         if (thread != NULL) {
00159                             mSocketThreads.push_back(thread);
00160                         }
00161 
00162                         mSessionFactory->addSessionToServer(session);
00163                     }
00164                 } catch (const VException& ex) {
00165                     // Likely cause: Failure in starting OS thread. Log, but keep listening.
00166                     VLOGGER_ERROR(VSTRING_FORMAT("[%s]VListenerThread::_runListening: Unable to create new session: Error %d. %s", this->getName().chars(), ex.getError(), ex.what()));
00167                     delete theSocket;
00168                 }
00169             } else {
00170                 /*
00171                 We timed out, which is normal if we have a timeout value.
00172                 As long as we haven't been stopped, we'll try again.
00173                 */
00174             }
00175         }
00176     } catch (const VException& ex) {
00177         exceptionMessage.format("[%s]VListenerThread::_runListening() caught exception #%d '%s'.", mName.chars(), ex.getError(), ex.what());
00178     } catch (const std::exception& ex) {
00179         exceptionMessage.format("[%s]VListenerThread::_runListening() caught exception '%s'.", mName.chars(), ex.what());
00180     } catch (...) {
00181         exceptionMessage.format("[%s]VListenerThread::_runListening() caught unknown exception.", mName.chars());
00182     }
00183 
00184     if (exceptionMessage.isNotEmpty()) {
00185         mShouldListen = false;
00186         VLOGGER_NAMED_ERROR(mLoggerName, exceptionMessage);
00187         if (mManager != NULL) {
00188             mManager->listenerFailed(this, exceptionMessage);
00189         }
00190     }
00191 
00192     delete listenerSocket;
00193 
00194     if (mManager != NULL) {
00195         mManager->listenerEnded(this);
00196     }
00197 }
00198 

Copyright ©1997-2014 Trygve Isaacson. All rights reserved. This documentation was generated with Doxygen.