Vault  4.1
vmessagequeue.cpp
00001 /*
00002 Copyright c1997-2014 Trygve Isaacson. All rights reserved.
00003 This file is part of the Code Vault version 4.1
00004 http://www.bombaydigital.com/
00005 License: MIT. See LICENSE.md in the Vault top level directory.
00006 */
00007 
00008 #include "vmessagequeue.h"
00009 
00010 #include "vmutexlocker.h"
00011 #include "vmessage.h"
00012 #include "vlogger.h"
00013 
00014 // VMessageQueue --------------------------------------------------------------
00015 
00016 VDuration VMessageQueue::gVMessageQueueLagLoggingThreshold(-1 * VDuration::MILLISECOND()); // -1 means we don't examine the lag time at all
00017 int VMessageQueue::gVMessageQueueLagLoggingLevel(VLoggerLevel::DEBUG);
00018 
00019 VMessageQueue::VMessageQueue()
00020     : mQueuedMessages()
00021     , mQueuedMessagesDataSize(0)
00022     , mMessageQueueMutex("VMessageQueue::mMessageQueueMutex")
00023     , mMessageQueueSemaphore()
00024     , mLastMessagePostTime()
00025     {
00026 }
00027 
00028 VMessageQueue::~VMessageQueue() {
00029     // 4.0: VMessagePtr means no more need to manually release mQueuedMessages contents.
00030 }
00031 
00032 void VMessageQueue::postMessage(VMessagePtr message) {
00033     VMutexLocker locker(&mMessageQueueMutex, "VMessageQueue::postMessage()");
00034 
00035     mQueuedMessages.push_back(message);
00036     mLastMessagePostTime.setNow();
00037 
00038     if (message != nullptr) {
00039         mQueuedMessagesDataSize += message->getMessageDataLength();
00040     }
00041 
00042     locker.unlock();    // otherwise signal() will deadlock
00043     mMessageQueueSemaphore.signal();
00044 }
00045 
00046 VMessagePtr VMessageQueue::blockUntilNextMessage() {
00047     // If there is a message on the queue, we can simply return it.
00048     VMessagePtr message = this->getNextMessage();
00049     if (message != nullptr) {
00050         return message;
00051     }
00052 
00053     // There is nothing on the queue, so wait until someone posts a message.
00054     VMutex dummy("VMessageQueue::blockUntilNextMessage() dummy");
00055     mMessageQueueSemaphore.wait(&dummy, 5 * VDuration::SECOND());
00056 
00057     return this->getNextMessage();
00058 }
00059 
00060 VMessagePtr VMessageQueue::getNextMessage() {
00061     VMessagePtr message;
00062 
00063     VMutexLocker locker(&mMessageQueueMutex, "VMessageQueue::getNextMessage()");
00064 
00065     if (mQueuedMessages.size() > 0) {
00066         message = mQueuedMessages.front();
00067         mQueuedMessages.pop_front();
00068 
00069         if (message != nullptr) {
00070             mQueuedMessagesDataSize -= message->getMessageDataLength();
00071         }
00072 
00073     }
00074 
00075     if ((message != nullptr) && (gVMessageQueueLagLoggingThreshold >= VDuration::ZERO())) {
00076         VInstant now;
00077         VDuration delayInterval = now - mLastMessagePostTime;
00078         if (delayInterval >= gVMessageQueueLagLoggingThreshold) {
00079             VLOGGER_NAMED_LEVEL("vault.messages.VMessageQueue", gVMessageQueueLagLoggingLevel, VSTRING_FORMAT("VMessageQueue saw a delay of %s when getting a message with ID %d.", delayInterval.getDurationString().chars(), message->getMessageID()));
00080         }
00081     }
00082 
00083     return message;
00084 }
00085 
00086 void VMessageQueue::wakeUp() {
00087     mMessageQueueSemaphore.signal();
00088 }
00089 
00090 VSizeType VMessageQueue::getQueueSize() const {
00091     // No need to lock here, nothing bad can happen underneath us.
00092     return mQueuedMessages.size();
00093 }
00094 
00095 Vs64 VMessageQueue::getQueueDataSize() const {
00096     // No need to lock here, nothing bad can happen underneath us.
00097     return mQueuedMessagesDataSize;
00098 }
00099 
00100 void VMessageQueue::releaseAllMessages() {
00101     VMutexLocker locker(&mMessageQueueMutex, "VMessageQueue::releaseAllMessages()");
00102 
00103     while (mQueuedMessages.size() > 0) {
00104         VMessagePtr message = mQueuedMessages.front();
00105         mQueuedMessages.pop_front();
00106 
00107         if (message != nullptr) {
00108             mQueuedMessagesDataSize -= message->getMessageDataLength();
00109         }
00110     }
00111 }
00112 

Copyright ©1997-2014 Trygve Isaacson. All rights reserved. This documentation was generated with Doxygen.