Vault
4.1
|
00001 /* 00002 Copyright c1997-2014 Trygve Isaacson. All rights reserved. 00003 This file is part of the Code Vault version 4.1 00004 http://www.bombaydigital.com/ 00005 License: MIT. See LICENSE.md in the Vault top level directory. 00006 */ 00007 00008 #include "vmessagequeue.h" 00009 00010 #include "vmutexlocker.h" 00011 #include "vmessage.h" 00012 #include "vlogger.h" 00013 00014 // VMessageQueue -------------------------------------------------------------- 00015 00016 VDuration VMessageQueue::gVMessageQueueLagLoggingThreshold(-1 * VDuration::MILLISECOND()); // -1 means we don't examine the lag time at all 00017 int VMessageQueue::gVMessageQueueLagLoggingLevel(VLoggerLevel::DEBUG); 00018 00019 VMessageQueue::VMessageQueue() 00020 : mQueuedMessages() 00021 , mQueuedMessagesDataSize(0) 00022 , mMessageQueueMutex("VMessageQueue::mMessageQueueMutex") 00023 , mMessageQueueSemaphore() 00024 , mLastMessagePostTime() 00025 { 00026 } 00027 00028 VMessageQueue::~VMessageQueue() { 00029 // 4.0: VMessagePtr means no more need to manually release mQueuedMessages contents. 00030 } 00031 00032 void VMessageQueue::postMessage(VMessagePtr message) { 00033 VMutexLocker locker(&mMessageQueueMutex, "VMessageQueue::postMessage()"); 00034 00035 mQueuedMessages.push_back(message); 00036 mLastMessagePostTime.setNow(); 00037 00038 if (message != nullptr) { 00039 mQueuedMessagesDataSize += message->getMessageDataLength(); 00040 } 00041 00042 locker.unlock(); // otherwise signal() will deadlock 00043 mMessageQueueSemaphore.signal(); 00044 } 00045 00046 VMessagePtr VMessageQueue::blockUntilNextMessage() { 00047 // If there is a message on the queue, we can simply return it. 00048 VMessagePtr message = this->getNextMessage(); 00049 if (message != nullptr) { 00050 return message; 00051 } 00052 00053 // There is nothing on the queue, so wait until someone posts a message. 00054 VMutex dummy("VMessageQueue::blockUntilNextMessage() dummy"); 00055 mMessageQueueSemaphore.wait(&dummy, 5 * VDuration::SECOND()); 00056 00057 return this->getNextMessage(); 00058 } 00059 00060 VMessagePtr VMessageQueue::getNextMessage() { 00061 VMessagePtr message; 00062 00063 VMutexLocker locker(&mMessageQueueMutex, "VMessageQueue::getNextMessage()"); 00064 00065 if (mQueuedMessages.size() > 0) { 00066 message = mQueuedMessages.front(); 00067 mQueuedMessages.pop_front(); 00068 00069 if (message != nullptr) { 00070 mQueuedMessagesDataSize -= message->getMessageDataLength(); 00071 } 00072 00073 } 00074 00075 if ((message != nullptr) && (gVMessageQueueLagLoggingThreshold >= VDuration::ZERO())) { 00076 VInstant now; 00077 VDuration delayInterval = now - mLastMessagePostTime; 00078 if (delayInterval >= gVMessageQueueLagLoggingThreshold) { 00079 VLOGGER_NAMED_LEVEL("vault.messages.VMessageQueue", gVMessageQueueLagLoggingLevel, VSTRING_FORMAT("VMessageQueue saw a delay of %s when getting a message with ID %d.", delayInterval.getDurationString().chars(), message->getMessageID())); 00080 } 00081 } 00082 00083 return message; 00084 } 00085 00086 void VMessageQueue::wakeUp() { 00087 mMessageQueueSemaphore.signal(); 00088 } 00089 00090 VSizeType VMessageQueue::getQueueSize() const { 00091 // No need to lock here, nothing bad can happen underneath us. 00092 return mQueuedMessages.size(); 00093 } 00094 00095 Vs64 VMessageQueue::getQueueDataSize() const { 00096 // No need to lock here, nothing bad can happen underneath us. 00097 return mQueuedMessagesDataSize; 00098 } 00099 00100 void VMessageQueue::releaseAllMessages() { 00101 VMutexLocker locker(&mMessageQueueMutex, "VMessageQueue::releaseAllMessages()"); 00102 00103 while (mQueuedMessages.size() > 0) { 00104 VMessagePtr message = mQueuedMessages.front(); 00105 mQueuedMessages.pop_front(); 00106 00107 if (message != nullptr) { 00108 mQueuedMessagesDataSize -= message->getMessageDataLength(); 00109 } 00110 } 00111 } 00112