threadpool.cpp

Go to the documentation of this file.
00001 /*
00002  * threadpool.cpp
00003  *
00004  * Copyright (C) 2007  Thomas A. Vaughan
00005  * All rights reserved.
00006  *
00007  *
00008  * Redistribution and use in source and binary forms, with or without
00009  * modification, are permitted provided that the following conditions are met:
00010  *     * Redistributions of source code must retain the above copyright
00011  *       notice, this list of conditions and the following disclaimer.
00012  *     * Redistributions in binary form must reproduce the above copyright
00013  *       notice, this list of conditions and the following disclaimer in the
00014  *       documentation and/or other materials provided with the distribution.
00015  *     * Neither the name of the <organization> nor the
00016  *       names of its contributors may be used to endorse or promote products
00017  *       derived from this software without specific prior written permission.
00018  *
00019  * THIS SOFTWARE IS PROVIDED BY THOMAS A. VAUGHAN ''AS IS'' AND ANY
00020  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
00021  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
00022  * DISCLAIMED. IN NO EVENT SHALL THOMAS A. VAUGHAN BE LIABLE FOR ANY
00023  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
00024  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00025  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
00026  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00027  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00028  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00029  *
00030  *
00031  * Implementation of a basic threadpool object.
00032  *
00033  * NOTE: at the moment, the implementation is *VERY* basic!  There is no pool at
00034  * all, and instead threads are created for every incoming request.  But I
00035  * wanted to at least set up the abstraction, and this can later be filled in
00036  * with real threadpool behavior.
00037  */
00038 
00039 // includes --------------------------------------------------------------------
00040 #include "threadpool.h"         // always include our own header first!
00041 
00042 #include "common/wave_ex.h"
00043 #include "datahash/datahash_util.h"
00044 #include "perf/perf.h"
00045 #include "threadsafe/smart_mutex.h"
00046 #include "threadsafe/threadsafe.h"
00047 
00048 
00049 namespace threadpool {
00050 
00051 
00052 // interface destructor implementation
00053 Pool::~Pool(void) throw() { }
00054 
00055 
00056 ////////////////////////////////////////////////////////////////////////////////
00057 //
00058 //      static helper methods
00059 //
00060 ////////////////////////////////////////////////////////////////////////////////
00061 
00062 
00063 ////////////////////////////////////////////////////////////////////////////////
00064 //
00065 //      TPool - object that implements the threadpool::Pool interface
00066 //
00067 ////////////////////////////////////////////////////////////////////////////////
00068 
00069 class TPool : public Pool {
00070 public:
00071         TPool(void) throw();
00072         ~TPool(void) throw() { }
00073 
00074         // public class methods ------------------------------------------------
00075         void initialize(IN const Datahash * params);
00076 
00077         // threadpool::Pool class interface methods ----------------------------
00078         bool addRequest(IN thread_work_fn work_fn,
00079                                 IN void * context);
00080 
00081 private:
00082         // private typedefs ----------------------------------------------------
00083         struct request_t {
00084                 request_t(void) { bzero(this, sizeof(request_t)); }
00085 
00086                 thread_work_fn          fn;
00087                 void *                  context;
00088                 TPool *                 pThis;
00089         };
00090 
00091         // private helper functions --------------------------------------------
00092         void updateThreadCount(IN int increment) throw();
00093         static void * poolThreadStart(IN void * ctx);
00094 
00095         // private data members ------------------------------------------------
00096         smart_mutex     m_mutex;
00097         int             m_maxThreadCount;
00098         int             m_threadCount;
00099 };
00100 
00101 
00102 
00103 TPool::TPool
00104 (
00105 void
00106 )
00107 throw()
00108 {
00109         m_maxThreadCount = 0;
00110         m_threadCount = 0;
00111 }
00112 
00113 
00114 
00115 void
00116 TPool::initialize
00117 (
00118 IN const Datahash * params
00119 )
00120 {
00121         ASSERT(params, "null");
00122 
00123         m_maxThreadCount = getInt(params, "maxThreads");
00124         ASSERT(m_maxThreadCount > 0, "Bad max thread count: %d",
00125             m_maxThreadCount);
00126 }
00127 
00128 
00129 
00130 bool
00131 TPool::addRequest
00132 (
00133 IN thread_work_fn fn,
00134 IN void * context
00135 )
00136 {
00137         perf::Timer timer("threadpool::addRequest");
00138         ASSERT(fn, "null");
00139         // ASSERT(context) -- we don't care!
00140 
00141         // NOTE: we aren't actually maintaining a thread pool!  But we at
00142         // least keep an eye on the thread count...
00143 
00144         smart_ptr<request_t> req = new request_t;
00145         ASSERT(req, "out of memory");
00146 
00147         req->fn = fn;
00148         req->context = context;
00149         req->pThis = this;
00150 
00151         thread_id_t thread_id_unused;
00152         ASSERT_THROW(!createThread(poolThreadStart, req, thread_id_unused),
00153             "Failed to create thread for threadpool?");
00154         req.disown();   // thread owns this now!
00155 
00156         // worked just fine
00157         return true;
00158 }
00159 
00160 
00161 
00162 void *
00163 TPool::poolThreadStart
00164 (
00165 IN void * ctx
00166 )
00167 {
00168         request_t * preq = (request_t *) ctx;
00169         ASSERT(preq, "null thread context?");
00170         smart_ptr<request_t> req = preq;
00171         ASSERT(req, "failed to assign smart pointer?");
00172         ASSERT(req->pThis, "null");
00173         ASSERT(req->fn, "null thread work function");
00174         // ASSERT(req->context) - we don't care
00175 
00176         // increment thread count
00177         req->pThis->updateThreadCount(+1);
00178 
00179         // invoke provided work function and provide context
00180         try {
00181                 req->fn(req->context);
00182         } catch (std::exception& e) {
00183                 DPRINTF("Exception in thread pool while handling request: %s",
00184                     e.what());
00185         } catch (...) {
00186                 DPRINTF("Unknown exception handling thread pool request!");
00187         }
00188 
00189         // decrement thread count
00190         req->pThis->updateThreadCount(-1);
00191 
00192         // all done
00193         return NULL;
00194 }
00195 
00196 
00197 
00198 void
00199 TPool::updateThreadCount
00200 (
00201 IN int increment
00202 )
00203 throw()
00204 {
00205         int count = 0;
00206         {
00207                 mlock lock(m_mutex);
00208                 m_threadCount += increment;
00209                 count = m_threadCount;
00210         }
00211 
00212         if (count < 0) {
00213                 DPRINTF("ERROR!  Negative thread count? %d", count);
00214         }
00215         if (count > m_maxThreadCount) {
00216                 DPRINTF("WARNING!  Thread count is higher than requested size");
00217                 DPRINTF("  Requested %d threads, have %d currently",
00218                     m_maxThreadCount, count);
00219         }
00220 }
00221 
00222 
00223 
00224 ////////////////////////////////////////////////////////////////////////////////
00225 //
00226 //      public API
00227 //
00228 ////////////////////////////////////////////////////////////////////////////////
00229 
00230 smart_ptr<Pool>
00231 Pool::create
00232 (
00233 IN const Datahash * params
00234 )
00235 {
00236         ASSERT(params, "null");
00237 
00238         smart_ptr<TPool> local = new TPool;
00239         ASSERT(local, "out of memory");
00240 
00241         local->initialize(params);
00242 
00243         return local;
00244 }
00245 
00246 
00247 
00248 };      // threadpool namespace
00249