Vault
4.1
|
00001 /* 00002 Copyright c1997-2014 Trygve Isaacson. All rights reserved. 00003 This file is part of the Code Vault version 4.1 00004 http://www.bombaydigital.com/ 00005 License: MIT. See LICENSE.md in the Vault top level directory. 00006 */ 00007 00008 #include "vclientsession.h" 00009 #include "vtypes_internal.h" 00010 00011 #include "vmutexlocker.h" 00012 #include "vserver.h" 00013 #include "vlogger.h" 00014 #include "vmessage.h" 00015 #include "vmessageinputthread.h" 00016 #include "vmessageoutputthread.h" 00017 #include "vsocket.h" 00018 #include "vbento.h" 00019 00020 // VClientSession -------------------------------------------------------------- 00021 00022 VClientSession::VClientSession(const VString& sessionBaseName, VServer* server, const VString& clientType, VSocket* socket, VMessageInputThread* inputThread, VMessageOutputThread* outputThread, const VDuration& standbyTimeLimit, Vs64 maxQueueDataSize) 00023 : VEnableSharedFromThis<VClientSession>() 00024 , mName(sessionBaseName) 00025 , mLoggerName(VSTRING_ARGS("vault.messages.VClientSession.%s.%s", sessionBaseName.chars(), VLogger::getCleansedLoggerName(socket->getHostIPAddress()).chars())) 00026 , mMutex(VString::EMPTY()/*name will be set in body*/) 00027 , mServer(server) 00028 , mClientType(clientType) 00029 , mClientIP(socket->getHostIPAddress()) 00030 , mClientPort(socket->getPortNumber()) 00031 , mClientAddress() 00032 , mInputThread(inputThread) 00033 , mOutputThread(outputThread) 00034 , mIsShuttingDown(false) 00035 , mStartupStandbyQueue() 00036 , mStandbyStartTime(VInstant::NEVER_OCCURRED()) 00037 , mStandbyTimeLimit(standbyTimeLimit) 00038 , mMaxClientQueueDataSize(maxQueueDataSize) 00039 , mSocket(socket) 00040 , mSocketStream(socket, "VClientSession") // FIXME: find a way to get the IP address here or to set in ctor 00041 , mIOStream(mSocketStream) 00042 { 00043 mClientAddress.format("%s:%d", mClientIP.chars(), mClientPort); 00044 mName.format("%s:%s:%d", sessionBaseName.chars(), mClientIP.chars(), mClientPort); 00045 mMutex.setName(VSTRING_FORMAT("VClientSession[%s]::mMutex", mName.chars())); 00046 00047 if (mServer == NULL) { 00048 VString message(VSTRING_ARGS("[%s] VClientSession: No server specified.", this->getClientAddress().chars())); 00049 VLOGGER_NAMED_ERROR(mLoggerName, message); 00050 throw VStackTraceException(message); 00051 } 00052 } 00053 00054 VClientSession::~VClientSession() { 00055 try { 00056 this->_releaseQueuedClientMessages(); 00057 } catch (...) {} 00058 00059 mInputThread = NULL; 00060 mOutputThread = NULL; 00061 00062 delete mSocket; 00063 } 00064 00065 void VClientSession::initIOThreads() { 00066 if (mInputThread != NULL) { 00067 mInputThread->attachSession(shared_from_this()); 00068 mInputThread->start(); 00069 } 00070 00071 if (mOutputThread != NULL) { 00072 mOutputThread->attachSession(shared_from_this()); 00073 mOutputThread->start(); 00074 } 00075 } 00076 00077 void VClientSession::shutdown(VThread* callingThread) { 00078 VMutexLocker locker(&mMutex, VSTRING_FORMAT("[%s]VClientSession::shutdown() %s", this->getName().chars(), (callingThread == NULL ? "" : callingThread->getName().chars()))); 00079 00080 mIsShuttingDown = true; 00081 00082 if (callingThread == NULL) { 00083 VLOGGER_NAMED_DEBUG(mLoggerName, VSTRING_FORMAT("[%s] VClientSession::shutdown: Server requested shutdown of VClientSession@0x%08X.", this->getName().chars(), this)); 00084 } 00085 00086 if (mInputThread != NULL) { 00087 if (callingThread == mInputThread) { 00088 mInputThread = NULL; 00089 VLOGGER_NAMED_DEBUG(mLoggerName, VSTRING_FORMAT("[%s] VClientSession::shutdown: Input Thread [%s] requested shutdown of VClientSession@0x%08X.", this->getName().chars(), callingThread->getName().chars(), this)); 00090 } else { 00091 mInputThread->stop(); 00092 } 00093 } 00094 00095 if (mOutputThread != NULL) { 00096 if (callingThread == mOutputThread) { 00097 mOutputThread = NULL; 00098 VLOGGER_NAMED_DEBUG(mLoggerName, VSTRING_FORMAT("[%s] VClientSession::shutdown: Output Thread [%s] requested shutdown of VClientSession@0x%08X.", this->getName().chars(), callingThread->getName().chars(), this)); 00099 } else { 00100 mOutputThread->stop(); 00101 } 00102 } 00103 00104 // Remove this session from the server's lists of active sessions, 00105 // so that it can be garbage collected. 00106 locker.unlock(); // Must release mMutex to avoid possibility of deadlock with a thread that could be posting broadcast right now, which has server lock, needs our lock. removeClientSession may need server lock. Deadlock. 00107 mServer->removeClientSession(shared_from_this()); 00108 } 00109 00110 void VClientSession::postOutputMessage(VMessagePtr message, bool isForBroadcast) { 00111 VMutexLocker locker(&mMutex, VSTRING_FORMAT("[%s]VClientSession::postOutputMessage()", this->getName().chars())); // protect the mStartupStandbyQueue during queue operations 00112 00113 // Don't post if client is doing a disconnect: 00114 if (mIsShuttingDown || this->isClientGoingOffline()) { 00115 VLOGGER_NAMED_WARN(mLoggerName, VSTRING_FORMAT("VClientSession::postOutputMessage: NOT posting message@0x%08X to going-offline session [%s], presumably in process of session shutdown.", message.get(), mClientAddress.chars())); 00116 return; 00117 } 00118 00119 if (isForBroadcast && ! this->isClientOnline()) { 00120 // This branch is entered only for posting to an offline session (e.g. a session still starting up 00121 // that is not ready to receive normal "posted" messages. 00122 // We either post to the session's standby queue, or if we hit a limit we start killing the session. 00123 00124 VInstant now; 00125 00126 if (mStandbyStartTime == VInstant::NEVER_OCCURRED()) { 00127 mStandbyStartTime = now; 00128 } 00129 00130 Vs64 currentQueueDataSize = mStartupStandbyQueue.getQueueDataSize(); 00131 if ((mMaxClientQueueDataSize > 0) && (currentQueueDataSize >= mMaxClientQueueDataSize)) { 00132 // We have hit the queue size limit. Do not post. Initiate a shutdown of this session. 00133 VLOGGER_NAMED_ERROR(mLoggerName, VSTRING_FORMAT("[%s] VClientSession::postOutputMessage: Reached output queue limit of " VSTRING_FORMATTER_S64 " bytes. Not posting message ID=%d. Closing socket to force shutdown of session and its i/o threads.", this->getName().chars(), mMaxClientQueueDataSize, message->getMessageID())); 00134 mSocket->close(); 00135 } else if ((mStandbyTimeLimit == VDuration::ZERO()) || (now <= mStandbyStartTime + mStandbyTimeLimit)) { 00136 VLOGGER_NAMED_DEBUG(mLoggerName, VSTRING_FORMAT("[%s] VClientSession::postOutputMessage: Placing message ID=%d on standby queue for not-yet-started session.", this->getName().chars(), message->getMessageID())); 00137 mStartupStandbyQueue.postMessage(message); 00138 } else { 00139 // We have hit the standby time limit. Do not post. Initiate a shutdown of this session. 00140 VLOGGER_NAMED_ERROR(mLoggerName, VSTRING_FORMAT("[%s] VClientSession::postOutputMessage: Reached standby time limit of %s. Not posting message ID=%d. Closing socket to force shutdown of session and its i/o threads.", this->getName().chars(), mStandbyTimeLimit.getDurationString().chars(), message->getMessageID())); 00141 mSocket->close(); 00142 } 00143 } else if (mOutputThread != NULL) { 00144 // This branch is entered only for posting to a session with an async output thread. 00145 // Typical for posting a message directly to 1 session or broadcasting to multiple sessions. 00146 // We need to post to the output thread. 00147 // Note that mOutputThread->postOutputMessage() stops its own thread if posting fails, triggering session end. We don't need to take action. 00148 mOutputThread->postOutputMessage(message); 00149 } else { // no output thread 00150 // Vault 4.0 TODO: This used to be for non-broadcast only, but I'm removing the distinction. 00151 // However, does this change how teardown works? Formerly the other branch (broadcast) treated 00152 // a null output thread as sign that we should close the socket. Hopefully that case is irrelevant 00153 // or will just throw an exception here if the mIOStream is dead. Here's the old comment: 00154 00155 // This branch is entered for non-broadcast synchronous-session posting. Just send on the socket stream. 00156 // This would only be for sessions that are synchronous and do not use a separate output thread. 00157 // Write the message directly to our output stream and release it. 00158 message->send(this->getName(), mIOStream); 00159 } 00160 00161 } 00162 00163 void VClientSession::sendMessageToClient(VMessagePtr message, const VString& sessionLabel, VBinaryIOStream& out) { 00164 // No longer a need to lock mMutex, because session is ref counted and cannot disappear from under us here. 00165 if (mIsShuttingDown || this->isClientGoingOffline()) { 00166 VLOGGER_NAMED_WARN(mLoggerName, VSTRING_FORMAT("VClientSession::sendMessageToClient: NOT sending message@0x%08X to offline session [%s], presumably in process of session shutdown.", message.get(), mClientAddress.chars())); 00167 } else { 00168 VLOGGER_NAMED_LEVEL(mLoggerName, VMessage::kMessageQueueOpsLevel, VSTRING_FORMAT("[%s] VClientSession::sendMessageToClient: Sending message@0x%08X.", sessionLabel.chars(), message.get())); 00169 message->send(sessionLabel, out); 00170 } 00171 } 00172 00173 VBentoNode* VClientSession::getSessionInfo() const { 00174 VBentoNode* result = new VBentoNode(mName); 00175 00176 result->addString("name", mName); 00177 result->addString("type", mClientType); 00178 result->addString("address", mClientAddress); 00179 result->addString("shutting", mIsShuttingDown ? "yes" : "no"); 00180 00181 int standbyQueueSize = (int) mStartupStandbyQueue.getQueueSize(); 00182 if (standbyQueueSize != 0) { 00183 result->addInt("standby-queue-size", (int) mStartupStandbyQueue.getQueueSize()); 00184 result->addS64("standby-queue-data-size", mStartupStandbyQueue.getQueueDataSize()); 00185 } 00186 00187 if (mOutputThread != NULL) { 00188 result->addInt("output-queue-size", mOutputThread->getOutputQueueSize()); 00189 } 00190 00191 return result; 00192 } 00193 00194 void VClientSession::_moveStandbyMessagesToAsyncOutputQueue() { 00195 // Note that we rely on the caller to lock the mMutex before calling us. 00196 // changeInitalizationState calls us but needs to lock a larger scope, 00197 // so we don't want to do the locking. 00198 VMessagePtr m = mStartupStandbyQueue.getNextMessage(); 00199 00200 while (m != nullptr) { 00201 VLOGGER_NAMED_TRACE(mLoggerName, VSTRING_FORMAT("[%s] VClientSession::_moveStandbyMessagesToAsyncOutputQueue: Moving message message@0x%08X from standby queue to output queue.", this->getName().chars(), m.get())); 00202 this->_postStandbyMessageToAsyncOutputQueue(m); 00203 m = mStartupStandbyQueue.getNextMessage(); 00204 } 00205 00206 mStandbyStartTime = VInstant::NEVER_OCCURRED(); // We are no longer in standby queuing mode (until next time we queue). 00207 } 00208 00209 int VClientSession::_getOutputQueueSize() const { 00210 return (mOutputThread == NULL) ? 0 : mOutputThread->getOutputQueueSize(); 00211 } 00212 00213 void VClientSession::_postStandbyMessageToAsyncOutputQueue(VMessagePtr message) { 00214 mOutputThread->postOutputMessage(message, false /* do not respect the queue limits, just move all messages onto the queue */); 00215 } 00216 00217 void VClientSession::_releaseQueuedClientMessages() { 00218 VMutexLocker locker(&mMutex, VSTRING_FORMAT("[%s]VClientSession::_releaseQueuedClientMessages()", this->getName().chars())); // protect the mStartupStandbyQueue during queue operations 00219 00220 // Order probably does not matter, but it makes sense to pop them in the order they would have been sent. 00221 00222 if (mOutputThread != NULL) { 00223 mOutputThread->releaseAllQueuedMessages(); 00224 } 00225 00226 mStartupStandbyQueue.releaseAllMessages(); 00227 } 00228 00229 // VClientSessionFactory ----------------------------------------------------------- 00230 00231 void VClientSessionFactory::addSessionToServer(VClientSessionPtr session) { 00232 if (mServer != NULL) { 00233 mServer->addClientSession(session); 00234 } 00235 }