00001 /* 00002 * threadsafe_queue.h 00003 * 00004 * Copyright (C) 2007,2009,2010 Thomas A. Vaughan 00005 * All rights reserved. 00006 * 00007 * 00008 * Redistribution and use in source and binary forms, with or without 00009 * modification, are permitted provided that the following conditions are met: 00010 * * Redistributions of source code must retain the above copyright 00011 * notice, this list of conditions and the following disclaimer. 00012 * * Redistributions in binary form must reproduce the above copyright 00013 * notice, this list of conditions and the following disclaimer in the 00014 * documentation and/or other materials provided with the distribution. 00015 * * Neither the name of the <organization> nor the 00016 * names of its contributors may be used to endorse or promote products 00017 * derived from this software without specific prior written permission. 00018 * 00019 * THIS SOFTWARE IS PROVIDED BY THOMAS A. VAUGHAN ''AS IS'' AND ANY 00020 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 00021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 00022 * DISCLAIMED. IN NO EVENT SHALL THOMAS A. VAUGHAN BE LIABLE FOR ANY 00023 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 00024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 00025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 00026 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 00027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 00028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00029 * 00030 * 00031 * Implementation of a simple threadsafe queue. 00032 */ 00033 00034 #ifndef WAVEPACKET_THREADSAFE_QUEUE_H__ 00035 #define WAVEPACKET_THREADSAFE_QUEUE_H__ 00036 00037 // includes -------------------------------------------------------------------- 00038 #include "smart_mutex.h" 00039 00040 #include <deque> 00041 00042 00043 ///\ingroup threadsafe 00044 /*@{*/ 00045 00046 00047 /// A threadsafe FIFO queue. Supports basic push/pop operations, and iteration. 00048 template <class T> 00049 class threadsafe_queue { 00050 private: 00051 // private typedefs ---------------------------------------------------- 00052 typedef typename std::deque<T>::iterator iterator; 00053 typedef threadsafe_queue<T> queue_t; 00054 00055 public: 00056 // public typedefs ----------------------------------------------------- 00057 class iterator_t { 00058 public: 00059 friend class threadsafe_queue<T>; 00060 private: 00061 iterator iter; 00062 dword_t rvn; 00063 }; 00064 00065 /// push an element onto the back of the queue 00066 void push_back(IN T& t) { 00067 mlock l(m_mutex); 00068 ++m_rvn; // queue has changed! increment version 00069 m_queue.push_back(t); 00070 } 00071 00072 /// pop an element from the front of the queue. 00073 /// Returns false if the queue is empty 00074 bool pop_front(OUT T& t) { 00075 bool retval = false; 00076 mlock l(m_mutex); 00077 if (m_queue.size()) { 00078 ++m_rvn; // queue has changed! 00079 retval = true; 00080 t = m_queue.front(); 00081 m_queue.pop_front(); 00082 } 00083 return retval; 00084 } 00085 00086 /// returns the number of elements in the queue 00087 int size(void) const throw() { 00088 queue_t * pThis = this->getNonConst(); 00089 mlock l(pThis->m_mutex); 00090 return pThis->m_queue.size(); 00091 } 00092 00093 /// resets the given iterator to point to the beginning of the queue 00094 void getIterator(OUT iterator_t& i) const throw() { 00095 queue_t * pThis = this->getNonConst(); 00096 mlock l(pThis->m_mutex); 00097 i.rvn = m_rvn; 00098 i.iter = pThis->m_queue.begin(); 00099 } 00100 00101 /// gets element pointed to by iterator, and increments iterator 00102 bool getNextElement(IO iterator_t& i, OUT T& t) const { 00103 queue_t * pThis = this->getNonConst(); 00104 mlock l(pThis->m_mutex); 00105 if (i.rvn != m_rvn) { 00106 return false; // iterator has been invalidated 00107 } 00108 if (pThis->m_queue.end() == i.iter) { 00109 return false; // already at end of queue 00110 } 00111 00112 // iterator is valid! get element and increment 00113 t = *i.iter; 00114 ++i.iter; 00115 return true; 00116 } 00117 00118 private: 00119 // private helper methods ---------------------------------------------- 00120 // really ugly method to retrieve a non-const pointer. This is because 00121 // read-only APIs need to take the mutex, even though they 00122 // otherwise won't modify the collection 00123 queue_t * getNonConst(void) const throw() { 00124 return (queue_t *) this; 00125 } 00126 00127 // private member data ------------------------------------------------- 00128 smart_mutex m_mutex; 00129 std::deque<T> m_queue; 00130 dword_t m_rvn; // record version number 00131 }; 00132 00133 00134 #endif // WAVEPACKET_THREADSAFE_QUEUE_H__ 00135