Vault  4.1
vclientsession.cpp
00001 /*
00002 Copyright c1997-2014 Trygve Isaacson. All rights reserved.
00003 This file is part of the Code Vault version 4.1
00004 http://www.bombaydigital.com/
00005 License: MIT. See LICENSE.md in the Vault top level directory.
00006 */
00007 
00008 #include "vclientsession.h"
00009 #include "vtypes_internal.h"
00010 
00011 #include "vmutexlocker.h"
00012 #include "vserver.h"
00013 #include "vlogger.h"
00014 #include "vmessage.h"
00015 #include "vmessageinputthread.h"
00016 #include "vmessageoutputthread.h"
00017 #include "vsocket.h"
00018 #include "vbento.h"
00019 
00020 // VClientSession --------------------------------------------------------------
00021 
00022 VClientSession::VClientSession(const VString& sessionBaseName, VServer* server, const VString& clientType, VSocket* socket, VMessageInputThread* inputThread, VMessageOutputThread* outputThread, const VDuration& standbyTimeLimit, Vs64 maxQueueDataSize)
00023     : VEnableSharedFromThis<VClientSession>()
00024     , mName(sessionBaseName)
00025     , mLoggerName(VSTRING_ARGS("vault.messages.VClientSession.%s.%s", sessionBaseName.chars(), VLogger::getCleansedLoggerName(socket->getHostIPAddress()).chars()))
00026     , mMutex(VString::EMPTY()/*name will be set in body*/)
00027     , mServer(server)
00028     , mClientType(clientType)
00029     , mClientIP(socket->getHostIPAddress())
00030     , mClientPort(socket->getPortNumber())
00031     , mClientAddress()
00032     , mInputThread(inputThread)
00033     , mOutputThread(outputThread)
00034     , mIsShuttingDown(false)
00035     , mStartupStandbyQueue()
00036     , mStandbyStartTime(VInstant::NEVER_OCCURRED())
00037     , mStandbyTimeLimit(standbyTimeLimit)
00038     , mMaxClientQueueDataSize(maxQueueDataSize)
00039     , mSocket(socket)
00040     , mSocketStream(socket, "VClientSession") // FIXME: find a way to get the IP address here or to set in ctor
00041     , mIOStream(mSocketStream)
00042     {
00043     mClientAddress.format("%s:%d", mClientIP.chars(), mClientPort);
00044     mName.format("%s:%s:%d", sessionBaseName.chars(), mClientIP.chars(), mClientPort);
00045     mMutex.setName(VSTRING_FORMAT("VClientSession[%s]::mMutex", mName.chars()));
00046 
00047     if (mServer == NULL) {
00048         VString message(VSTRING_ARGS("[%s] VClientSession: No server specified.", this->getClientAddress().chars()));
00049         VLOGGER_NAMED_ERROR(mLoggerName, message);
00050         throw VStackTraceException(message);
00051     }
00052 }
00053 
00054 VClientSession::~VClientSession() {
00055     try {
00056         this->_releaseQueuedClientMessages();
00057     } catch (...) {}
00058 
00059     mInputThread = NULL;
00060     mOutputThread = NULL;
00061 
00062     delete mSocket;
00063 }
00064 
00065 void VClientSession::initIOThreads() {
00066     if (mInputThread != NULL) {
00067         mInputThread->attachSession(shared_from_this());
00068         mInputThread->start();
00069     }
00070 
00071     if (mOutputThread != NULL) {
00072         mOutputThread->attachSession(shared_from_this());
00073         mOutputThread->start();
00074     }
00075 }
00076 
00077 void VClientSession::shutdown(VThread* callingThread) {
00078     VMutexLocker locker(&mMutex, VSTRING_FORMAT("[%s]VClientSession::shutdown() %s", this->getName().chars(), (callingThread == NULL ? "" : callingThread->getName().chars())));
00079 
00080     mIsShuttingDown = true;
00081 
00082     if (callingThread == NULL) {
00083         VLOGGER_NAMED_DEBUG(mLoggerName, VSTRING_FORMAT("[%s] VClientSession::shutdown: Server requested shutdown of VClientSession@0x%08X.", this->getName().chars(), this));
00084     }
00085 
00086     if (mInputThread != NULL) {
00087         if (callingThread == mInputThread) {
00088             mInputThread = NULL;
00089             VLOGGER_NAMED_DEBUG(mLoggerName, VSTRING_FORMAT("[%s] VClientSession::shutdown: Input Thread [%s] requested shutdown of VClientSession@0x%08X.", this->getName().chars(), callingThread->getName().chars(), this));
00090         } else {
00091             mInputThread->stop();
00092         }
00093     }
00094 
00095     if (mOutputThread != NULL) {
00096         if (callingThread == mOutputThread) {
00097             mOutputThread = NULL;
00098             VLOGGER_NAMED_DEBUG(mLoggerName, VSTRING_FORMAT("[%s] VClientSession::shutdown: Output Thread [%s] requested shutdown of VClientSession@0x%08X.", this->getName().chars(), callingThread->getName().chars(), this));
00099         } else {
00100             mOutputThread->stop();
00101         }
00102     }
00103 
00104     // Remove this session from the server's lists of active sessions,
00105     // so that it can be garbage collected.
00106     locker.unlock(); // Must release mMutex to avoid possibility of deadlock with a thread that could be posting broadcast right now, which has server lock, needs our lock. removeClientSession may need server lock. Deadlock.
00107     mServer->removeClientSession(shared_from_this());
00108 }
00109 
00110 void VClientSession::postOutputMessage(VMessagePtr message, bool isForBroadcast) {
00111     VMutexLocker locker(&mMutex, VSTRING_FORMAT("[%s]VClientSession::postOutputMessage()", this->getName().chars())); // protect the mStartupStandbyQueue during queue operations
00112 
00113     // Don't post if client is doing a disconnect:
00114     if (mIsShuttingDown || this->isClientGoingOffline()) {
00115         VLOGGER_NAMED_WARN(mLoggerName, VSTRING_FORMAT("VClientSession::postOutputMessage: NOT posting message@0x%08X to going-offline session [%s], presumably in process of session shutdown.", message.get(), mClientAddress.chars()));
00116         return;
00117     }
00118 
00119     if (isForBroadcast && ! this->isClientOnline()) {
00120         // This branch is entered only for posting to an offline session (e.g. a session still starting up
00121         // that is not ready to receive normal "posted" messages.
00122         // We either post to the session's standby queue, or if we hit a limit we start killing the session.
00123 
00124         VInstant now;
00125 
00126         if (mStandbyStartTime == VInstant::NEVER_OCCURRED()) {
00127             mStandbyStartTime = now;
00128         }
00129 
00130         Vs64 currentQueueDataSize = mStartupStandbyQueue.getQueueDataSize();
00131         if ((mMaxClientQueueDataSize > 0) && (currentQueueDataSize >= mMaxClientQueueDataSize)) {
00132             // We have hit the queue size limit. Do not post. Initiate a shutdown of this session.
00133             VLOGGER_NAMED_ERROR(mLoggerName, VSTRING_FORMAT("[%s] VClientSession::postOutputMessage: Reached output queue limit of " VSTRING_FORMATTER_S64 " bytes. Not posting message ID=%d. Closing socket to force shutdown of session and its i/o threads.", this->getName().chars(), mMaxClientQueueDataSize, message->getMessageID()));
00134             mSocket->close();
00135         } else if ((mStandbyTimeLimit == VDuration::ZERO()) || (now <= mStandbyStartTime + mStandbyTimeLimit)) {
00136             VLOGGER_NAMED_DEBUG(mLoggerName, VSTRING_FORMAT("[%s] VClientSession::postOutputMessage: Placing message ID=%d on standby queue for not-yet-started session.", this->getName().chars(), message->getMessageID()));
00137             mStartupStandbyQueue.postMessage(message);
00138         } else {
00139             // We have hit the standby time limit. Do not post. Initiate a shutdown of this session.
00140             VLOGGER_NAMED_ERROR(mLoggerName, VSTRING_FORMAT("[%s] VClientSession::postOutputMessage: Reached standby time limit of %s. Not posting message ID=%d. Closing socket to force shutdown of session and its i/o threads.", this->getName().chars(), mStandbyTimeLimit.getDurationString().chars(), message->getMessageID()));
00141             mSocket->close();
00142         }
00143     } else if (mOutputThread != NULL) {
00144         // This branch is entered only for posting to a session with an async output thread.
00145         // Typical for posting a message directly to 1 session or broadcasting to multiple sessions.
00146         // We need to post to the output thread.
00147         // Note that mOutputThread->postOutputMessage() stops its own thread if posting fails, triggering session end. We don't need to take action.
00148         mOutputThread->postOutputMessage(message);
00149     } else { // no output thread
00150         // Vault 4.0 TODO: This used to be for non-broadcast only, but I'm removing the distinction.
00151         // However, does this change how teardown works? Formerly the other branch (broadcast) treated
00152         // a null output thread as sign that we should close the socket. Hopefully that case is irrelevant
00153         // or will just throw an exception here if the mIOStream is dead. Here's the old comment:
00154 
00155         // This branch is entered for non-broadcast synchronous-session posting. Just send on the socket stream.
00156         // This would only be for sessions that are synchronous and do not use a separate output thread.
00157         // Write the message directly to our output stream and release it.
00158         message->send(this->getName(), mIOStream);
00159     }
00160 
00161 }
00162 
00163 void VClientSession::sendMessageToClient(VMessagePtr message, const VString& sessionLabel, VBinaryIOStream& out) {
00164     // No longer a need to lock mMutex, because session is ref counted and cannot disappear from under us here.
00165     if (mIsShuttingDown || this->isClientGoingOffline()) {
00166         VLOGGER_NAMED_WARN(mLoggerName, VSTRING_FORMAT("VClientSession::sendMessageToClient: NOT sending message@0x%08X to offline session [%s], presumably in process of session shutdown.", message.get(), mClientAddress.chars()));
00167     } else {
00168         VLOGGER_NAMED_LEVEL(mLoggerName, VMessage::kMessageQueueOpsLevel, VSTRING_FORMAT("[%s] VClientSession::sendMessageToClient: Sending message@0x%08X.", sessionLabel.chars(), message.get()));
00169         message->send(sessionLabel, out);
00170     }
00171 }
00172 
00173 VBentoNode* VClientSession::getSessionInfo() const {
00174     VBentoNode* result = new VBentoNode(mName);
00175 
00176     result->addString("name", mName);
00177     result->addString("type", mClientType);
00178     result->addString("address", mClientAddress);
00179     result->addString("shutting", mIsShuttingDown ? "yes" : "no");
00180 
00181     int standbyQueueSize = (int) mStartupStandbyQueue.getQueueSize();
00182     if (standbyQueueSize != 0) {
00183         result->addInt("standby-queue-size", (int) mStartupStandbyQueue.getQueueSize());
00184         result->addS64("standby-queue-data-size", mStartupStandbyQueue.getQueueDataSize());
00185     }
00186 
00187     if (mOutputThread != NULL) {
00188         result->addInt("output-queue-size", mOutputThread->getOutputQueueSize());
00189     }
00190 
00191     return result;
00192 }
00193 
00194 void VClientSession::_moveStandbyMessagesToAsyncOutputQueue() {
00195     // Note that we rely on the caller to lock the mMutex before calling us.
00196     // changeInitalizationState calls us but needs to lock a larger scope,
00197     // so we don't want to do the locking.
00198     VMessagePtr m = mStartupStandbyQueue.getNextMessage();
00199 
00200     while (m != nullptr) {
00201         VLOGGER_NAMED_TRACE(mLoggerName, VSTRING_FORMAT("[%s] VClientSession::_moveStandbyMessagesToAsyncOutputQueue: Moving message message@0x%08X from standby queue to output queue.", this->getName().chars(), m.get()));
00202         this->_postStandbyMessageToAsyncOutputQueue(m);
00203         m = mStartupStandbyQueue.getNextMessage();
00204     }
00205 
00206     mStandbyStartTime = VInstant::NEVER_OCCURRED(); // We are no longer in standby queuing mode (until next time we queue).
00207 }
00208 
00209 int VClientSession::_getOutputQueueSize() const {
00210     return (mOutputThread == NULL) ? 0 : mOutputThread->getOutputQueueSize();
00211 }
00212 
00213 void VClientSession::_postStandbyMessageToAsyncOutputQueue(VMessagePtr message) {
00214     mOutputThread->postOutputMessage(message, false /* do not respect the queue limits, just move all messages onto the queue */);
00215 }
00216 
00217 void VClientSession::_releaseQueuedClientMessages() {
00218     VMutexLocker locker(&mMutex, VSTRING_FORMAT("[%s]VClientSession::_releaseQueuedClientMessages()", this->getName().chars())); // protect the mStartupStandbyQueue during queue operations
00219 
00220     // Order probably does not matter, but it makes sense to pop them in the order they would have been sent.
00221 
00222     if (mOutputThread != NULL) {
00223         mOutputThread->releaseAllQueuedMessages();
00224     }
00225 
00226     mStartupStandbyQueue.releaseAllMessages();
00227 }
00228 
00229 // VClientSessionFactory -----------------------------------------------------------
00230 
00231 void VClientSessionFactory::addSessionToServer(VClientSessionPtr session) {
00232     if (mServer != NULL) {
00233         mServer->addClientSession(session);
00234     }
00235 }

Copyright ©1997-2014 Trygve Isaacson. All rights reserved. This documentation was generated with Doxygen.