Vault
4.1
|
00001 /* 00002 Copyright c1997-2014 Trygve Isaacson. All rights reserved. 00003 This file is part of the Code Vault version 4.1 00004 http://www.bombaydigital.com/ 00005 License: MIT. See LICENSE.md in the Vault top level directory. 00006 */ 00007 00008 #include "vmessageoutputthread.h" 00009 00010 #include "vexception.h" 00011 #include "vclientsession.h" 00012 #include "vsocket.h" 00013 #include "vmessageinputthread.h" 00014 #include "vlogger.h" 00015 00016 // VMessageOutputThread ------------------------------------------------------- 00017 00018 VMessageOutputThread::VMessageOutputThread(const VString& threadBaseName, VSocket* socket, VListenerThread* ownerThread, VServer* server, VClientSessionPtr session, VMessageInputThread* dependentInputThread, int maxQueueSize, Vs64 maxQueueDataSize, const VDuration& maxQueueGracePeriod) 00019 : VSocketThread(threadBaseName, socket, ownerThread) 00020 , mOutputQueue() 00021 , mSocketStream(socket, "VMessageOutputThread") 00022 , mOutputStream(mSocketStream) 00023 , mServer(server) 00024 , mSession(session) 00025 , mDependentInputThread(dependentInputThread) 00026 , mMaxQueueSize(maxQueueSize) 00027 , mMaxQueueDataSize(maxQueueDataSize) 00028 , mMaxQueueGracePeriod(maxQueueGracePeriod) 00029 , mWhenMaxQueueSizeWarned(VInstant() - VDuration::MINUTE()) // one minute ago (past warning throttle threshold) 00030 , mWasOverLimit(false) 00031 , mWhenWentOverLimit(VInstant::NEVER_OCCURRED()) 00032 { 00033 00034 if (mDependentInputThread != NULL) { 00035 mDependentInputThread->setHasOutputThread(true); 00036 } 00037 } 00038 00039 VMessageOutputThread::~VMessageOutputThread() { 00040 mOutputQueue.releaseAllMessages(); 00041 00042 /* 00043 We share the socket w/ the input thread. We sort of let the input 00044 thread be the owner. So we don't want our superclass to see 00045 mSocket and clean it up. Just set it to NULL so that the other 00046 class will be the one to do so. 00047 */ 00048 mSocket = NULL; 00049 00050 mServer = NULL; 00051 mDependentInputThread = NULL; 00052 } 00053 00054 void VMessageOutputThread::run() { 00055 try { 00056 while (this->isRunning()) { 00057 this->_processNextOutboundMessage(); 00058 } 00059 } catch (const VSocketClosedException& /*ex*/) { 00060 VLOGGER_NAMED_DEBUG(mLoggerName, VSTRING_FORMAT("[%s] VMessageOutputThread: Socket has closed, thread will end.", mName.chars())); 00061 } catch (const VException& ex) { 00062 /* 00063 Unlike the input threads, we shouldn't normally get an EOF exception to indicate that the 00064 connection has been closed normally, because we are an output thread. So any exceptions 00065 that land here uncaught are socket i/o errors and are logged as such. However, if our thread 00066 has been told to stop -- is no longer in running state -- then exceptions due to the socket 00067 being closed programmatically are to be expected, so we check that before logging an error. 00068 */ 00069 if (this->isRunning()) { 00070 VLOGGER_NAMED_ERROR(mLoggerName, VSTRING_FORMAT("[%s] VMessageOutputThread::run: Exiting due to top level exception #%d '%s'.", mName.chars(), ex.getError(), ex.what())); 00071 } else { 00072 VLOGGER_NAMED_DEBUG(mLoggerName, VSTRING_FORMAT("[%s] VMessageOutputThread: Socket has closed, thread will end.", mName.chars())); 00073 } 00074 } catch (const std::exception& ex) { 00075 if (this->isRunning()) { 00076 VLOGGER_NAMED_ERROR(mLoggerName, VSTRING_FORMAT("[%s] VMessageOutputThread: Exiting due to top level exception '%s'.", mName.chars(), ex.what())); 00077 } 00078 } catch (...) { 00079 if (this->isRunning()) { 00080 VLOGGER_NAMED_ERROR(mLoggerName, VSTRING_FORMAT("[%s] VMessageOutputThread: Exiting due to top level unknown exception.", mName.chars())); 00081 } 00082 } 00083 00084 if (mSession != nullptr) { 00085 mSession->shutdown(this); 00086 } 00087 00088 if (mDependentInputThread != NULL) { 00089 mDependentInputThread->setHasOutputThread(false); 00090 } 00091 } 00092 00093 void VMessageOutputThread::stop() { 00094 VSocketThread::stop(); 00095 mOutputQueue.wakeUp(); // if it's blocked, this is needed to kick it back to its run loop 00096 } 00097 00098 void VMessageOutputThread::attachSession(VClientSessionPtr session) { 00099 mSession = session; 00100 } 00101 00102 bool VMessageOutputThread::postOutputMessage(VMessagePtr message, bool respectQueueLimits) { 00103 if (respectQueueLimits) { 00104 int currentQueueSize = 0; 00105 Vs64 currentQueueDataSize = 0; 00106 if (! this->isOutputQueueOverLimit(currentQueueSize, currentQueueDataSize)) { 00107 mWasOverLimit = false; 00108 } else { 00109 VInstant now; 00110 bool gracePeriodExceeded = false; 00111 00112 if (mWasOverLimit) { 00113 // Still over limit. Have we exceeded the grace period? 00114 VDuration howLongOverLimit = now - mWhenWentOverLimit; 00115 gracePeriodExceeded = (howLongOverLimit > mMaxQueueGracePeriod); 00116 } else { 00117 // We've just gone over the limit. 00118 // If there is a grace period, note the time. 00119 if (mMaxQueueGracePeriod == VDuration::ZERO()) { 00120 gracePeriodExceeded = true; 00121 } else { 00122 mWhenWentOverLimit = now; 00123 mWasOverLimit = true; 00124 } 00125 } 00126 00127 if (gracePeriodExceeded) { 00128 if (this->isRunning()) { // Only stop() once; we may land here repeatedly under fast queueing, before stop completes. 00129 VLOGGER_NAMED_ERROR(mLoggerName, VSTRING_FORMAT("[%s] VMessageOutputThread::postOutputMessage: Closing socket to shut down session because output queue size of %d messages and " VSTRING_FORMATTER_S64 " bytes is over limit.", 00130 mName.chars(), currentQueueSize, currentQueueDataSize)); 00131 00132 this->stop(); 00133 } 00134 00135 return false; 00136 } else { 00137 if (now - mWhenMaxQueueSizeWarned > VDuration::MINUTE()) { // Throttle the rate of ongoing warnings. 00138 mWhenMaxQueueSizeWarned = now; 00139 VDuration gracePeriodRemaining = (mWhenWentOverLimit + mMaxQueueGracePeriod) - now; 00140 VLOGGER_NAMED_WARN(mLoggerName, VSTRING_FORMAT("[%s] VMessageOutputThread::postOutputMessage: Posting to queue with excess size of %d messages and " VSTRING_FORMATTER_S64 " bytes. Remaining grace period %d seconds.", 00141 mName.chars(), currentQueueSize, currentQueueDataSize, gracePeriodRemaining.getDurationSeconds())); 00142 } 00143 } 00144 } 00145 } 00146 00147 bool posted = false; 00148 try { 00149 mOutputQueue.postMessage(message); // can throw bad_alloc if out of memory and queue cannot push_back 00150 posted = true; 00151 } catch (...) { 00152 VLOGGER_NAMED_ERROR(mLoggerName, VSTRING_FORMAT("[%s] VMessageOutputThread::postOutputMessage: Closing socket to shut down session because ran out memory.", mName.chars())); 00153 this->stop(); 00154 } 00155 00156 return posted; 00157 } 00158 00159 void VMessageOutputThread::releaseAllQueuedMessages() { 00160 mOutputQueue.releaseAllMessages(); 00161 } 00162 00163 int VMessageOutputThread::getOutputQueueSize() const { 00164 return static_cast<int>(mOutputQueue.getQueueSize()); 00165 } 00166 00167 bool VMessageOutputThread::isOutputQueueOverLimit(int& currentQueueSize, Vs64& currentQueueDataSize) const { 00168 currentQueueSize = static_cast<int>(mOutputQueue.getQueueSize()); 00169 currentQueueDataSize = mOutputQueue.getQueueDataSize(); 00170 00171 return (((mMaxQueueSize != 0) && (currentQueueSize >= mMaxQueueSize)) || 00172 ((mMaxQueueDataSize != 0) && (currentQueueDataSize >= mMaxQueueDataSize))); 00173 } 00174 00175 void VMessageOutputThread::_processNextOutboundMessage() { 00176 VMessagePtr message = mOutputQueue.blockUntilNextMessage(); 00177 00178 if (message == nullptr) { 00179 // OK -- means we were awakened from block but w/o a message actually available 00180 } else { 00181 if (mSession != nullptr) { 00182 mSession->sendMessageToClient(message, mName, mOutputStream); 00183 } else { 00184 // We are just a client. No "session". Just send. 00185 VLOGGER_NAMED_LEVEL(mLoggerName, VMessage::kMessageQueueOpsLevel, VSTRING_FORMAT("[%s] VMessageOutputThread::_processNextOutboundMessage: Sending message@0x%08X.", mName.chars(), message.get())); 00186 message->send(mName, mOutputStream); 00187 } 00188 } 00189 } 00190