Vault  4.1
vmessageoutputthread.cpp
00001 /*
00002 Copyright c1997-2014 Trygve Isaacson. All rights reserved.
00003 This file is part of the Code Vault version 4.1
00004 http://www.bombaydigital.com/
00005 License: MIT. See LICENSE.md in the Vault top level directory.
00006 */
00007 
00008 #include "vmessageoutputthread.h"
00009 
00010 #include "vexception.h"
00011 #include "vclientsession.h"
00012 #include "vsocket.h"
00013 #include "vmessageinputthread.h"
00014 #include "vlogger.h"
00015 
00016 // VMessageOutputThread -------------------------------------------------------
00017 
00018 VMessageOutputThread::VMessageOutputThread(const VString& threadBaseName, VSocket* socket, VListenerThread* ownerThread, VServer* server, VClientSessionPtr session, VMessageInputThread* dependentInputThread, int maxQueueSize, Vs64 maxQueueDataSize, const VDuration& maxQueueGracePeriod)
00019     : VSocketThread(threadBaseName, socket, ownerThread)
00020     , mOutputQueue()
00021     , mSocketStream(socket, "VMessageOutputThread")
00022     , mOutputStream(mSocketStream)
00023     , mServer(server)
00024     , mSession(session)
00025     , mDependentInputThread(dependentInputThread)
00026     , mMaxQueueSize(maxQueueSize)
00027     , mMaxQueueDataSize(maxQueueDataSize)
00028     , mMaxQueueGracePeriod(maxQueueGracePeriod)
00029     , mWhenMaxQueueSizeWarned(VInstant() - VDuration::MINUTE()) // one minute ago (past warning throttle threshold)
00030     , mWasOverLimit(false)
00031     , mWhenWentOverLimit(VInstant::NEVER_OCCURRED())
00032     {
00033 
00034     if (mDependentInputThread != NULL) {
00035         mDependentInputThread->setHasOutputThread(true);
00036     }
00037 }
00038 
00039 VMessageOutputThread::~VMessageOutputThread() {
00040     mOutputQueue.releaseAllMessages();
00041 
00042     /*
00043     We share the socket w/ the input thread. We sort of let the input
00044     thread be the owner. So we don't want our superclass to see
00045     mSocket and clean it up. Just set it to NULL so that the other
00046     class will be the one to do so.
00047     */
00048     mSocket = NULL;
00049 
00050     mServer = NULL;
00051     mDependentInputThread = NULL;
00052 }
00053 
00054 void VMessageOutputThread::run() {
00055     try {
00056         while (this->isRunning()) {
00057             this->_processNextOutboundMessage();
00058         }
00059     } catch (const VSocketClosedException& /*ex*/) {
00060         VLOGGER_NAMED_DEBUG(mLoggerName, VSTRING_FORMAT("[%s] VMessageOutputThread: Socket has closed, thread will end.", mName.chars()));
00061     } catch (const VException& ex) {
00062         /*
00063         Unlike the input threads, we shouldn't normally get an EOF exception to indicate that the
00064         connection has been closed normally, because we are an output thread. So any exceptions
00065         that land here uncaught are socket i/o errors and are logged as such. However, if our thread
00066         has been told to stop -- is no longer in running state -- then exceptions due to the socket
00067         being closed programmatically are to be expected, so we check that before logging an error.
00068         */
00069         if (this->isRunning()) {
00070             VLOGGER_NAMED_ERROR(mLoggerName, VSTRING_FORMAT("[%s] VMessageOutputThread::run: Exiting due to top level exception #%d '%s'.", mName.chars(), ex.getError(), ex.what()));
00071         } else {
00072             VLOGGER_NAMED_DEBUG(mLoggerName, VSTRING_FORMAT("[%s] VMessageOutputThread: Socket has closed, thread will end.", mName.chars()));
00073         }
00074     } catch (const std::exception& ex) {
00075         if (this->isRunning()) {
00076             VLOGGER_NAMED_ERROR(mLoggerName, VSTRING_FORMAT("[%s] VMessageOutputThread: Exiting due to top level exception '%s'.", mName.chars(), ex.what()));
00077         }
00078     } catch (...) {
00079         if (this->isRunning()) {
00080             VLOGGER_NAMED_ERROR(mLoggerName, VSTRING_FORMAT("[%s] VMessageOutputThread: Exiting due to top level unknown exception.", mName.chars()));
00081         }
00082     }
00083 
00084     if (mSession != nullptr) {
00085         mSession->shutdown(this);
00086     }
00087 
00088     if (mDependentInputThread != NULL) {
00089         mDependentInputThread->setHasOutputThread(false);
00090     }
00091 }
00092 
00093 void VMessageOutputThread::stop() {
00094     VSocketThread::stop();
00095     mOutputQueue.wakeUp(); // if it's blocked, this is needed to kick it back to its run loop
00096 }
00097 
00098 void VMessageOutputThread::attachSession(VClientSessionPtr session) {
00099     mSession = session;
00100 }
00101 
00102 bool VMessageOutputThread::postOutputMessage(VMessagePtr message, bool respectQueueLimits) {
00103     if (respectQueueLimits) {
00104         int currentQueueSize = 0;
00105         Vs64 currentQueueDataSize = 0;
00106         if (! this->isOutputQueueOverLimit(currentQueueSize, currentQueueDataSize)) {
00107             mWasOverLimit = false;
00108         } else {
00109             VInstant now;
00110             bool gracePeriodExceeded = false;
00111 
00112             if (mWasOverLimit) {
00113                 // Still over limit. Have we exceeded the grace period?
00114                 VDuration howLongOverLimit = now - mWhenWentOverLimit;
00115                 gracePeriodExceeded = (howLongOverLimit > mMaxQueueGracePeriod);
00116             } else {
00117                 // We've just gone over the limit.
00118                 // If there is a grace period, note the time.
00119                 if (mMaxQueueGracePeriod == VDuration::ZERO()) {
00120                     gracePeriodExceeded = true;
00121                 } else {
00122                     mWhenWentOverLimit = now;
00123                     mWasOverLimit = true;
00124                 }
00125             }
00126 
00127             if (gracePeriodExceeded) {
00128                 if (this->isRunning()) { // Only stop() once; we may land here repeatedly under fast queueing, before stop completes.
00129                     VLOGGER_NAMED_ERROR(mLoggerName, VSTRING_FORMAT("[%s] VMessageOutputThread::postOutputMessage: Closing socket to shut down session because output queue size of %d messages and " VSTRING_FORMATTER_S64 " bytes is over limit.",
00130                                                  mName.chars(), currentQueueSize, currentQueueDataSize));
00131 
00132                     this->stop();
00133                 }
00134 
00135                 return false;
00136             } else {
00137                 if (now - mWhenMaxQueueSizeWarned > VDuration::MINUTE()) { // Throttle the rate of ongoing warnings.
00138                     mWhenMaxQueueSizeWarned = now;
00139                     VDuration gracePeriodRemaining = (mWhenWentOverLimit + mMaxQueueGracePeriod) - now;
00140                     VLOGGER_NAMED_WARN(mLoggerName, VSTRING_FORMAT("[%s] VMessageOutputThread::postOutputMessage: Posting to queue with excess size of %d messages and " VSTRING_FORMATTER_S64 " bytes. Remaining grace period %d seconds.",
00141                                                 mName.chars(), currentQueueSize, currentQueueDataSize, gracePeriodRemaining.getDurationSeconds()));
00142                 }
00143             }
00144         }
00145     }
00146 
00147     bool posted = false;
00148     try {
00149         mOutputQueue.postMessage(message); // can throw bad_alloc if out of memory and queue cannot push_back
00150         posted = true;
00151     } catch (...) {
00152         VLOGGER_NAMED_ERROR(mLoggerName, VSTRING_FORMAT("[%s] VMessageOutputThread::postOutputMessage: Closing socket to shut down session because ran out memory.", mName.chars()));
00153         this->stop();
00154     }
00155 
00156     return posted;
00157 }
00158 
00159 void VMessageOutputThread::releaseAllQueuedMessages() {
00160     mOutputQueue.releaseAllMessages();
00161 }
00162 
00163 int VMessageOutputThread::getOutputQueueSize() const {
00164     return static_cast<int>(mOutputQueue.getQueueSize());
00165 }
00166 
00167 bool VMessageOutputThread::isOutputQueueOverLimit(int& currentQueueSize, Vs64& currentQueueDataSize) const {
00168     currentQueueSize = static_cast<int>(mOutputQueue.getQueueSize());
00169     currentQueueDataSize = mOutputQueue.getQueueDataSize();
00170 
00171     return (((mMaxQueueSize != 0) && (currentQueueSize >= mMaxQueueSize)) ||
00172             ((mMaxQueueDataSize != 0) && (currentQueueDataSize >= mMaxQueueDataSize)));
00173 }
00174 
00175 void VMessageOutputThread::_processNextOutboundMessage() {
00176     VMessagePtr message = mOutputQueue.blockUntilNextMessage();
00177 
00178     if (message == nullptr) {
00179         // OK -- means we were awakened from block but w/o a message actually available
00180     } else {
00181         if (mSession != nullptr) {
00182             mSession->sendMessageToClient(message, mName, mOutputStream);
00183         } else {
00184             // We are just a client. No "session". Just send.
00185             VLOGGER_NAMED_LEVEL(mLoggerName, VMessage::kMessageQueueOpsLevel, VSTRING_FORMAT("[%s] VMessageOutputThread::_processNextOutboundMessage: Sending message@0x%08X.", mName.chars(), message.get()));
00186             message->send(mName, mOutputStream);
00187         }
00188     }
00189 }
00190 

Copyright ©1997-2014 Trygve Isaacson. All rights reserved. This documentation was generated with Doxygen.