threadsafe_queue.h

Go to the documentation of this file.
00001 /*
00002  * threadsafe_queue.h
00003  * 
00004  * Copyright (C) 2007,2009,2010  Thomas A. Vaughan
00005  * All rights reserved.
00006  *
00007  *
00008  * Redistribution and use in source and binary forms, with or without
00009  * modification, are permitted provided that the following conditions are met:
00010  *     * Redistributions of source code must retain the above copyright
00011  *       notice, this list of conditions and the following disclaimer.
00012  *     * Redistributions in binary form must reproduce the above copyright
00013  *       notice, this list of conditions and the following disclaimer in the
00014  *       documentation and/or other materials provided with the distribution.
00015  *     * Neither the name of the <organization> nor the
00016  *       names of its contributors may be used to endorse or promote products
00017  *       derived from this software without specific prior written permission.
00018  *
00019  * THIS SOFTWARE IS PROVIDED BY THOMAS A. VAUGHAN ''AS IS'' AND ANY
00020  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
00021  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
00022  * DISCLAIMED. IN NO EVENT SHALL THOMAS A. VAUGHAN BE LIABLE FOR ANY
00023  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
00024  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00025  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
00026  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00027  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00028  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00029  *
00030  *
00031  * Implementation of a simple threadsafe queue.
00032  */
00033 
00034 #ifndef WAVEPACKET_THREADSAFE_QUEUE_H__
00035 #define WAVEPACKET_THREADSAFE_QUEUE_H__
00036 
00037 // includes --------------------------------------------------------------------
00038 #include "smart_mutex.h"
00039 
00040 #include <deque>
00041 
00042 
00043 ///\ingroup threadsafe
00044 /*@{*/
00045 
00046 
00047 /// A threadsafe FIFO queue.  Supports basic push/pop operations, and iteration.
00048 template <class T>
00049 class threadsafe_queue {
00050 private:
00051         // private typedefs ----------------------------------------------------
00052         typedef typename std::deque<T>::iterator        iterator;
00053         typedef threadsafe_queue<T>                     queue_t;
00054 
00055 public:
00056         // public typedefs -----------------------------------------------------
00057         class iterator_t {
00058         public:
00059                 friend class threadsafe_queue<T>;
00060         private:
00061                 iterator                iter;
00062                 dword_t                 rvn;
00063         };
00064 
00065         /// push an element onto the back of the queue
00066         void push_back(IN T& t) {
00067                         mlock l(m_mutex);
00068                         ++m_rvn;        // queue has changed!  increment version
00069                         m_queue.push_back(t);
00070                 }
00071 
00072         /// pop an element from the front of the queue.
00073         /// Returns false if the queue is empty
00074         bool pop_front(OUT T& t) {
00075                         bool retval = false;
00076                         mlock l(m_mutex);
00077                         if (m_queue.size()) {
00078                                 ++m_rvn;        // queue has changed!
00079                                 retval = true;
00080                                 t = m_queue.front();
00081                                 m_queue.pop_front();
00082                         }
00083                         return retval;
00084                 }
00085 
00086         /// returns the number of elements in the queue
00087         int size(void) const throw() {
00088                         queue_t * pThis = this->getNonConst();
00089                         mlock l(pThis->m_mutex);
00090                         return pThis->m_queue.size();
00091                 }
00092 
00093         /// resets the given iterator to point to the beginning of the queue
00094         void getIterator(OUT iterator_t& i) const throw() {
00095                         queue_t * pThis = this->getNonConst();
00096                         mlock l(pThis->m_mutex);
00097                         i.rvn = m_rvn;
00098                         i.iter = pThis->m_queue.begin();
00099                 }
00100 
00101         /// gets element pointed to by iterator, and increments iterator
00102         bool getNextElement(IO iterator_t& i, OUT T& t) const {
00103                         queue_t * pThis = this->getNonConst();
00104                         mlock l(pThis->m_mutex);
00105                         if (i.rvn != m_rvn) {
00106                                 return false;   // iterator has been invalidated
00107                         }
00108                         if (pThis->m_queue.end() == i.iter) {
00109                                 return false;   // already at end of queue
00110                         }
00111 
00112                         // iterator is valid!  get element and increment
00113                         t = *i.iter;
00114                         ++i.iter;
00115                         return true;
00116                 }
00117 
00118 private:
00119         // private helper methods ----------------------------------------------
00120         // really ugly method to retrieve a non-const pointer.  This is because
00121         //      read-only APIs need to take the mutex, even though they
00122         //      otherwise won't modify the collection
00123         queue_t * getNonConst(void) const throw() {
00124                         return (queue_t *) this;
00125                 }
00126 
00127         // private member data -------------------------------------------------
00128         smart_mutex             m_mutex;
00129         std::deque<T>           m_queue;
00130         dword_t                 m_rvn;          // record version number
00131 };
00132 
00133 
00134 #endif  // WAVEPACKET_THREADSAFE_QUEUE_H__
00135