Vault  4.1
vmessageinputthread.cpp
00001 /*
00002 Copyright c1997-2014 Trygve Isaacson. All rights reserved.
00003 This file is part of the Code Vault version 4.1
00004 http://www.bombaydigital.com/
00005 License: MIT. See LICENSE.md in the Vault top level directory.
00006 */
00007 
00008 #include "vmessageinputthread.h"
00009 
00010 #include "vexception.h"
00011 #include "vmessagehandler.h"
00012 #include "vlogger.h"
00013 #include "vmessage.h"
00014 #include "vclientsession.h"
00015 #include "vbento.h"
00016 
00017 // VMessageInputThread --------------------------------------------------------
00018 
00019 VMessageInputThread::VMessageInputThread(const VString& threadBaseName, VSocket* socket, VListenerThread* ownerThread, VServer* server, const VMessageFactory* messageFactory)
00020     : VSocketThread(threadBaseName, socket, ownerThread)
00021     , mSocketStream(socket, "VMessageInputThread")
00022     , mInputStream(mSocketStream)
00023     , mConnected(false)
00024     , mSession()
00025     , mServer(server)
00026     , mMessageFactory(messageFactory)
00027     , mHasOutputThread(false)
00028     {
00029 }
00030 
00031 VMessageInputThread::~VMessageInputThread() {
00032     // If we have a session, it is responsible for deleting the mSocket, not us.
00033     // This is because a session has input and output threads, the order of whose
00034     // destruction is unpredictable; so we cannot let our base class delete the mSocket.
00035     if (mSession != nullptr) {
00036         mSocket = NULL;
00037     }
00038 
00039     mServer = NULL;
00040     mMessageFactory = NULL;
00041 }
00042 
00043 void VMessageInputThread::run() {
00044     /*
00045     We process messages until we're "done". Done is flagged by the subclass
00046     marking the thread done when it sees fit. The subclass must catch any
00047     exceptions that are not catastrophic, because we are the last resort,
00048     and if we catch an exception we complete the thread, which will cause
00049     the connection to be shut down (the subclass may need to shut down
00050     additional resources by overridding run() and post-processing it).
00051     Note that in the "error" exceptions below, we don't bother logging if
00052     we know that the exception is due to expected input thread shutdown,
00053     recognized by the fact that we are no longer in running state.
00054     */
00055 
00056     try {
00057         while (this->isRunning()) {
00058             this->_processNextRequest(); // Blocking read on socket; then message is dispatched.
00059         }
00060     } catch (const VEOFException& /*ex*/) {
00061         // Usually just means the client has closed the connection.
00062         VLOGGER_NAMED_DEBUG(mLoggerName, VSTRING_FORMAT("[%s] VMessageInputThread: Socket has closed (EOF), thread will end.", mName.chars()));
00063     } catch (const VSocketClosedException& /*ex*/) {
00064         VLOGGER_NAMED_DEBUG(mLoggerName, VSTRING_FORMAT("[%s] VMessageInputThread: Socket has closed, thread will end.", mName.chars()));
00065     } catch (const VException& ex) {
00066         if (this->isRunning()) {
00067             VLOGGER_NAMED_ERROR(mLoggerName, VSTRING_FORMAT("[%s] VMessageInputThread: Exiting due to top level exception #%d '%s'.", mName.chars(), ex.getError(), ex.what()));
00068         }
00069     } catch (const std::exception& ex) {
00070         if (this->isRunning()) {
00071             VLOGGER_NAMED_ERROR(mLoggerName, VSTRING_FORMAT("[%s] VMessageInputThread: Exiting due to top level exception '%s'.", mName.chars(), ex.what()));
00072         }
00073     } catch (...) {
00074         if (this->isRunning()) {
00075             VLOGGER_NAMED_ERROR(mLoggerName, VSTRING_FORMAT("[%s] VMessageInputThread: Exiting due to top level unknown exception.", mName.chars()));
00076         }
00077     }
00078 
00079     if (mSession != nullptr) {
00080         mSession->shutdown(this);
00081     }
00082 
00083     // If we are dependent on an output thread, we must spin here until it clears the flag.
00084     const VDuration warnLimit = 15 * VDuration::SECOND();
00085     const VInstant startTime;
00086     bool warned = false;
00087     while (mHasOutputThread) {
00088         VThread::sleep(50 * VDuration::MILLISECOND());
00089         if (!warned) {
00090             const VInstant now;
00091             const VDuration duration = now - startTime;
00092             if (duration > warnLimit) {
00093                 warned = true;
00094                 VLOGGER_NAMED_WARN(mLoggerName, VSTRING_FORMAT("[%s] VMessageInputThread: Still waiting for output thread to end after %s. Will warn again when output thread ends.", mName.chars(), duration.getDurationString().chars()));
00095             }
00096         }
00097     }
00098 
00099     if (warned) {
00100         const VInstant now;
00101         const VDuration duration = now - startTime;
00102         VLOGGER_NAMED_WARN(mLoggerName, VSTRING_FORMAT("[%s] VMessageInputThread: Finally saw output thread end after %s.", mName.chars(), duration.getDurationString().chars()));
00103     }
00104 }
00105 
00106 void VMessageInputThread::attachSession(VClientSessionPtr session) {
00107     mSession = session;
00108 }
00109 
00110 //lint -e429 "Custodial pointer 'message' has not been freed or returned" [OK: try or catch branches guarantee message is released.]
00111 void VMessageInputThread::_processNextRequest() {
00112     VMessagePtr message = mMessageFactory->instantiateNewMessage();
00113 
00114     /*
00115     RULES FOR EXCEPTION HANDLING IN REQUEST PROCESSING FUNCTIONS.
00116     (This text is referenced from the other implementations of
00117     _processNextRequest().)
00118     Rules for exception handling here:
00119     1. Receive may throw to us. This situation indicates a serious stream error,
00120         and we should bail out and shut down the socket (not catching achieves this).
00121     2. Dispatch may NOT throw to us unless the error is serious enough
00122         to warrant shutting down the socket; the implementation must catch
00123         all recoverable errors, so that we may continue processing
00124         subsequent incoming messages, under the assumption that the
00125         previous message was correctly formatted even if we encountered a
00126         problem while attempting to handle it.
00127     3. Vault 4.0 uses VMessagePtr smart pointers, so we no longer need to
00128         catch here in order to release the message we instantiated above
00129         before re-throwing. So there is no longer a try/catch here at all.
00130     */
00131     message->receive(mName, mInputStream);
00132     this->_dispatchMessage(message);
00133 }
00134 
00135 void VMessageInputThread::_dispatchMessage(VMessagePtr message) {
00136     VMessageHandler* handler = VMessageHandler::get(message, mServer, mSession, this);
00137 
00138     if (handler == NULL) {
00139         VLOGGER_NAMED_ERROR(mLoggerName, VSTRING_FORMAT("[%s] VMessageInputThread::_dispatchMessage: No message hander defined for message %d.", mName.chars(), (int) message->getMessageID()));
00140         this->_handleNoMessageHandler(message);
00141     } else {
00142         VInstant messageHandlerStart;
00143 
00144         /*
00145         PLEASE SEE COMMENTS IN VMessageInputThread::_processNextRequest() FOR THE
00146         RULES ON EXCEPTION HANDLING DURING REQUEST PROCESSING.
00147         */
00148         try {
00149             this->_beforeProcessMessage(handler, message);
00150             this->_callProcessMessage(handler);
00151             this->_afterProcessMessage(handler);
00152         } catch (const VException& ex) {
00153             VLOGGER_NAMED_ERROR(mLoggerName, VSTRING_FORMAT("[%s] VMessageInputThread::_dispatchMessage: Caught exception for message %d: #%d %s", mName.chars(), (int) message->getMessageID(), ex.getError(), ex.what()));
00154         } catch (const std::exception& e) {
00155             VLOGGER_NAMED_ERROR(mLoggerName, VSTRING_FORMAT("[%s] VMessageInputThread::_dispatchMessage: Caught exception for message ID %d: %s", mName.chars(), (int) message->getMessageID(), e.what()));
00156         } catch (...) {
00157             VLOGGER_NAMED_ERROR(mLoggerName, VSTRING_FORMAT("[%s] VMessageInputThread::_dispatchMessage: Caught unknown exception for message ID %d.", mName.chars(), (int) message->getMessageID()));
00158         }
00159 
00160         delete handler;
00161     }
00162 }
00163 
00164 void VMessageInputThread::_callProcessMessage(VMessageHandler* handler) {
00165     handler->logProcessMessageStart();
00166     handler->processMessage();
00167     handler->logProcessMessageEnd();
00168 }
00169 
00170 // VBentoMessageInputThread ---------------------------------------------------
00171 
00172 VBentoMessageInputThread::VBentoMessageInputThread(const VString& threadBaseName, VSocket* socket, VListenerThread* ownerThread, VServer* server, const VMessageFactory* messageFactory) :
00173     VMessageInputThread(threadBaseName, socket, ownerThread, server, messageFactory) {
00174 }
00175 
00176 void VBentoMessageInputThread::_handleNoMessageHandler(VMessagePtr message) {
00177     VBentoNode responseData("response");
00178     responseData.addInt("result", -1);
00179     responseData.addString("error-message", VSTRING_FORMAT("Invalid message ID %d. No handler defined.", (int) message->getMessageID()));
00180 
00181     VString bentoText;
00182     responseData.writeToBentoTextString(bentoText);
00183     VLOGGER_NAMED_ERROR(mLoggerName, VSTRING_FORMAT("[%s] Error Reply: %s", mName.chars(), bentoText.chars()));
00184 
00185     VMessagePtr response = mMessageFactory->instantiateNewMessage();
00186     responseData.writeToStream(*response);
00187     VBinaryIOStream io(mSocketStream);
00188     response->send(mName, io);
00189 }
00190 
00191 void VBentoMessageInputThread::_callProcessMessage(VMessageHandler* handler) {
00192     try {
00193         VMessageInputThread::_callProcessMessage(handler);
00194     } catch (const std::exception& ex) {
00195         VBentoNode responseData("response");
00196         responseData.addInt("result", -1);
00197         responseData.addString("error-message", VSTRING_FORMAT("An error occurred processing the message: %s", ex.what()));
00198 
00199         VString bentoText;
00200         responseData.writeToBentoTextString(bentoText);
00201         VLOGGER_NAMED_ERROR(mLoggerName, VSTRING_FORMAT("[%s] Error Reply: %s", mName.chars(), bentoText.chars()));
00202 
00203         VMessagePtr response = mMessageFactory->instantiateNewMessage();
00204         responseData.writeToStream(*response);
00205         VBinaryIOStream io(mSocketStream);
00206         response->send(mName, io);
00207     }
00208 }
00209 

Copyright ©1997-2014 Trygve Isaacson. All rights reserved. This documentation was generated with Doxygen.