Vault
4.1
|
00001 /* 00002 Copyright c1997-2014 Trygve Isaacson. All rights reserved. 00003 This file is part of the Code Vault version 4.1 00004 http://www.bombaydigital.com/ 00005 License: MIT. See LICENSE.md in the Vault top level directory. 00006 */ 00007 00008 #include "vmessageinputthread.h" 00009 00010 #include "vexception.h" 00011 #include "vmessagehandler.h" 00012 #include "vlogger.h" 00013 #include "vmessage.h" 00014 #include "vclientsession.h" 00015 #include "vbento.h" 00016 00017 // VMessageInputThread -------------------------------------------------------- 00018 00019 VMessageInputThread::VMessageInputThread(const VString& threadBaseName, VSocket* socket, VListenerThread* ownerThread, VServer* server, const VMessageFactory* messageFactory) 00020 : VSocketThread(threadBaseName, socket, ownerThread) 00021 , mSocketStream(socket, "VMessageInputThread") 00022 , mInputStream(mSocketStream) 00023 , mConnected(false) 00024 , mSession() 00025 , mServer(server) 00026 , mMessageFactory(messageFactory) 00027 , mHasOutputThread(false) 00028 { 00029 } 00030 00031 VMessageInputThread::~VMessageInputThread() { 00032 // If we have a session, it is responsible for deleting the mSocket, not us. 00033 // This is because a session has input and output threads, the order of whose 00034 // destruction is unpredictable; so we cannot let our base class delete the mSocket. 00035 if (mSession != nullptr) { 00036 mSocket = NULL; 00037 } 00038 00039 mServer = NULL; 00040 mMessageFactory = NULL; 00041 } 00042 00043 void VMessageInputThread::run() { 00044 /* 00045 We process messages until we're "done". Done is flagged by the subclass 00046 marking the thread done when it sees fit. The subclass must catch any 00047 exceptions that are not catastrophic, because we are the last resort, 00048 and if we catch an exception we complete the thread, which will cause 00049 the connection to be shut down (the subclass may need to shut down 00050 additional resources by overridding run() and post-processing it). 00051 Note that in the "error" exceptions below, we don't bother logging if 00052 we know that the exception is due to expected input thread shutdown, 00053 recognized by the fact that we are no longer in running state. 00054 */ 00055 00056 try { 00057 while (this->isRunning()) { 00058 this->_processNextRequest(); // Blocking read on socket; then message is dispatched. 00059 } 00060 } catch (const VEOFException& /*ex*/) { 00061 // Usually just means the client has closed the connection. 00062 VLOGGER_NAMED_DEBUG(mLoggerName, VSTRING_FORMAT("[%s] VMessageInputThread: Socket has closed (EOF), thread will end.", mName.chars())); 00063 } catch (const VSocketClosedException& /*ex*/) { 00064 VLOGGER_NAMED_DEBUG(mLoggerName, VSTRING_FORMAT("[%s] VMessageInputThread: Socket has closed, thread will end.", mName.chars())); 00065 } catch (const VException& ex) { 00066 if (this->isRunning()) { 00067 VLOGGER_NAMED_ERROR(mLoggerName, VSTRING_FORMAT("[%s] VMessageInputThread: Exiting due to top level exception #%d '%s'.", mName.chars(), ex.getError(), ex.what())); 00068 } 00069 } catch (const std::exception& ex) { 00070 if (this->isRunning()) { 00071 VLOGGER_NAMED_ERROR(mLoggerName, VSTRING_FORMAT("[%s] VMessageInputThread: Exiting due to top level exception '%s'.", mName.chars(), ex.what())); 00072 } 00073 } catch (...) { 00074 if (this->isRunning()) { 00075 VLOGGER_NAMED_ERROR(mLoggerName, VSTRING_FORMAT("[%s] VMessageInputThread: Exiting due to top level unknown exception.", mName.chars())); 00076 } 00077 } 00078 00079 if (mSession != nullptr) { 00080 mSession->shutdown(this); 00081 } 00082 00083 // If we are dependent on an output thread, we must spin here until it clears the flag. 00084 const VDuration warnLimit = 15 * VDuration::SECOND(); 00085 const VInstant startTime; 00086 bool warned = false; 00087 while (mHasOutputThread) { 00088 VThread::sleep(50 * VDuration::MILLISECOND()); 00089 if (!warned) { 00090 const VInstant now; 00091 const VDuration duration = now - startTime; 00092 if (duration > warnLimit) { 00093 warned = true; 00094 VLOGGER_NAMED_WARN(mLoggerName, VSTRING_FORMAT("[%s] VMessageInputThread: Still waiting for output thread to end after %s. Will warn again when output thread ends.", mName.chars(), duration.getDurationString().chars())); 00095 } 00096 } 00097 } 00098 00099 if (warned) { 00100 const VInstant now; 00101 const VDuration duration = now - startTime; 00102 VLOGGER_NAMED_WARN(mLoggerName, VSTRING_FORMAT("[%s] VMessageInputThread: Finally saw output thread end after %s.", mName.chars(), duration.getDurationString().chars())); 00103 } 00104 } 00105 00106 void VMessageInputThread::attachSession(VClientSessionPtr session) { 00107 mSession = session; 00108 } 00109 00110 //lint -e429 "Custodial pointer 'message' has not been freed or returned" [OK: try or catch branches guarantee message is released.] 00111 void VMessageInputThread::_processNextRequest() { 00112 VMessagePtr message = mMessageFactory->instantiateNewMessage(); 00113 00114 /* 00115 RULES FOR EXCEPTION HANDLING IN REQUEST PROCESSING FUNCTIONS. 00116 (This text is referenced from the other implementations of 00117 _processNextRequest().) 00118 Rules for exception handling here: 00119 1. Receive may throw to us. This situation indicates a serious stream error, 00120 and we should bail out and shut down the socket (not catching achieves this). 00121 2. Dispatch may NOT throw to us unless the error is serious enough 00122 to warrant shutting down the socket; the implementation must catch 00123 all recoverable errors, so that we may continue processing 00124 subsequent incoming messages, under the assumption that the 00125 previous message was correctly formatted even if we encountered a 00126 problem while attempting to handle it. 00127 3. Vault 4.0 uses VMessagePtr smart pointers, so we no longer need to 00128 catch here in order to release the message we instantiated above 00129 before re-throwing. So there is no longer a try/catch here at all. 00130 */ 00131 message->receive(mName, mInputStream); 00132 this->_dispatchMessage(message); 00133 } 00134 00135 void VMessageInputThread::_dispatchMessage(VMessagePtr message) { 00136 VMessageHandler* handler = VMessageHandler::get(message, mServer, mSession, this); 00137 00138 if (handler == NULL) { 00139 VLOGGER_NAMED_ERROR(mLoggerName, VSTRING_FORMAT("[%s] VMessageInputThread::_dispatchMessage: No message hander defined for message %d.", mName.chars(), (int) message->getMessageID())); 00140 this->_handleNoMessageHandler(message); 00141 } else { 00142 VInstant messageHandlerStart; 00143 00144 /* 00145 PLEASE SEE COMMENTS IN VMessageInputThread::_processNextRequest() FOR THE 00146 RULES ON EXCEPTION HANDLING DURING REQUEST PROCESSING. 00147 */ 00148 try { 00149 this->_beforeProcessMessage(handler, message); 00150 this->_callProcessMessage(handler); 00151 this->_afterProcessMessage(handler); 00152 } catch (const VException& ex) { 00153 VLOGGER_NAMED_ERROR(mLoggerName, VSTRING_FORMAT("[%s] VMessageInputThread::_dispatchMessage: Caught exception for message %d: #%d %s", mName.chars(), (int) message->getMessageID(), ex.getError(), ex.what())); 00154 } catch (const std::exception& e) { 00155 VLOGGER_NAMED_ERROR(mLoggerName, VSTRING_FORMAT("[%s] VMessageInputThread::_dispatchMessage: Caught exception for message ID %d: %s", mName.chars(), (int) message->getMessageID(), e.what())); 00156 } catch (...) { 00157 VLOGGER_NAMED_ERROR(mLoggerName, VSTRING_FORMAT("[%s] VMessageInputThread::_dispatchMessage: Caught unknown exception for message ID %d.", mName.chars(), (int) message->getMessageID())); 00158 } 00159 00160 delete handler; 00161 } 00162 } 00163 00164 void VMessageInputThread::_callProcessMessage(VMessageHandler* handler) { 00165 handler->logProcessMessageStart(); 00166 handler->processMessage(); 00167 handler->logProcessMessageEnd(); 00168 } 00169 00170 // VBentoMessageInputThread --------------------------------------------------- 00171 00172 VBentoMessageInputThread::VBentoMessageInputThread(const VString& threadBaseName, VSocket* socket, VListenerThread* ownerThread, VServer* server, const VMessageFactory* messageFactory) : 00173 VMessageInputThread(threadBaseName, socket, ownerThread, server, messageFactory) { 00174 } 00175 00176 void VBentoMessageInputThread::_handleNoMessageHandler(VMessagePtr message) { 00177 VBentoNode responseData("response"); 00178 responseData.addInt("result", -1); 00179 responseData.addString("error-message", VSTRING_FORMAT("Invalid message ID %d. No handler defined.", (int) message->getMessageID())); 00180 00181 VString bentoText; 00182 responseData.writeToBentoTextString(bentoText); 00183 VLOGGER_NAMED_ERROR(mLoggerName, VSTRING_FORMAT("[%s] Error Reply: %s", mName.chars(), bentoText.chars())); 00184 00185 VMessagePtr response = mMessageFactory->instantiateNewMessage(); 00186 responseData.writeToStream(*response); 00187 VBinaryIOStream io(mSocketStream); 00188 response->send(mName, io); 00189 } 00190 00191 void VBentoMessageInputThread::_callProcessMessage(VMessageHandler* handler) { 00192 try { 00193 VMessageInputThread::_callProcessMessage(handler); 00194 } catch (const std::exception& ex) { 00195 VBentoNode responseData("response"); 00196 responseData.addInt("result", -1); 00197 responseData.addString("error-message", VSTRING_FORMAT("An error occurred processing the message: %s", ex.what())); 00198 00199 VString bentoText; 00200 responseData.writeToBentoTextString(bentoText); 00201 VLOGGER_NAMED_ERROR(mLoggerName, VSTRING_FORMAT("[%s] Error Reply: %s", mName.chars(), bentoText.chars())); 00202 00203 VMessagePtr response = mMessageFactory->instantiateNewMessage(); 00204 responseData.writeToStream(*response); 00205 VBinaryIOStream io(mSocketStream); 00206 response->send(mName, io); 00207 } 00208 } 00209