Go to the documentation of this file.00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 
00034 
00035 
00036 
00037 
00038 
00039 
00040 #include "threadpool.h"         
00041 
00042 #include "common/wave_ex.h"
00043 #include "datahash/datahash_util.h"
00044 #include "perf/perf.h"
00045 #include "threadsafe/smart_mutex.h"
00046 #include "threadsafe/threadsafe.h"
00047 
00048 
00049 namespace threadpool {
00050 
00051 
00052 
00053 Pool::~Pool(void) throw() { }
00054 
00055 
00056 
00057 
00058 
00059 
00060 
00061 
00062 
00063 
00064 
00065 
00066 
00067 
00068 
00069 class TPool : public Pool {
00070 public:
00071         TPool(void) throw();
00072         ~TPool(void) throw() { }
00073 
00074         
00075         void initialize(IN const Datahash * params);
00076 
00077         
00078         bool addRequest(IN thread_work_fn work_fn,
00079                                 IN void * context);
00080 
00081 private:
00082         
00083         struct request_t {
00084                 request_t(void) { bzero(this, sizeof(request_t)); }
00085 
00086                 thread_work_fn          fn;
00087                 void *                  context;
00088                 TPool *                 pThis;
00089         };
00090 
00091         
00092         void updateThreadCount(IN int increment) throw();
00093         static void * poolThreadStart(IN void * ctx);
00094 
00095         
00096         smart_mutex     m_mutex;
00097         int             m_maxThreadCount;
00098         int             m_threadCount;
00099 };
00100 
00101 
00102 
00103 TPool::TPool
00104 (
00105 void
00106 )
00107 throw()
00108 {
00109         m_maxThreadCount = 0;
00110         m_threadCount = 0;
00111 }
00112 
00113 
00114 
00115 void
00116 TPool::initialize
00117 (
00118 IN const Datahash * params
00119 )
00120 {
00121         ASSERT(params, "null");
00122 
00123         m_maxThreadCount = getInt(params, "maxThreads");
00124         ASSERT(m_maxThreadCount > 0, "Bad max thread count: %d",
00125             m_maxThreadCount);
00126 }
00127 
00128 
00129 
00130 bool
00131 TPool::addRequest
00132 (
00133 IN thread_work_fn fn,
00134 IN void * context
00135 )
00136 {
00137         perf::Timer timer("threadpool::addRequest");
00138         ASSERT(fn, "null");
00139         
00140 
00141         
00142         
00143 
00144         smart_ptr<request_t> req = new request_t;
00145         ASSERT(req, "out of memory");
00146 
00147         req->fn = fn;
00148         req->context = context;
00149         req->pThis = this;
00150 
00151         thread_id_t thread_id_unused;
00152         ASSERT_THROW(!createThread(poolThreadStart, req, thread_id_unused),
00153             "Failed to create thread for threadpool?");
00154         req.disown();   
00155 
00156         
00157         return true;
00158 }
00159 
00160 
00161 
00162 void *
00163 TPool::poolThreadStart
00164 (
00165 IN void * ctx
00166 )
00167 {
00168         request_t * preq = (request_t *) ctx;
00169         ASSERT(preq, "null thread context?");
00170         smart_ptr<request_t> req = preq;
00171         ASSERT(req, "failed to assign smart pointer?");
00172         ASSERT(req->pThis, "null");
00173         ASSERT(req->fn, "null thread work function");
00174         
00175 
00176         
00177         req->pThis->updateThreadCount(+1);
00178 
00179         
00180         try {
00181                 req->fn(req->context);
00182         } catch (std::exception& e) {
00183                 DPRINTF("Exception in thread pool while handling request: %s",
00184                     e.what());
00185         } catch (...) {
00186                 DPRINTF("Unknown exception handling thread pool request!");
00187         }
00188 
00189         
00190         req->pThis->updateThreadCount(-1);
00191 
00192         
00193         return NULL;
00194 }
00195 
00196 
00197 
00198 void
00199 TPool::updateThreadCount
00200 (
00201 IN int increment
00202 )
00203 throw()
00204 {
00205         int count = 0;
00206         {
00207                 mlock lock(m_mutex);
00208                 m_threadCount += increment;
00209                 count = m_threadCount;
00210         }
00211 
00212         if (count < 0) {
00213                 DPRINTF("ERROR!  Negative thread count? %d", count);
00214         }
00215         if (count > m_maxThreadCount) {
00216                 DPRINTF("WARNING!  Thread count is higher than requested size");
00217                 DPRINTF("  Requested %d threads, have %d currently",
00218                     m_maxThreadCount, count);
00219         }
00220 }
00221 
00222 
00223 
00224 
00225 
00226 
00227 
00228 
00229 
00230 smart_ptr<Pool>
00231 Pool::create
00232 (
00233 IN const Datahash * params
00234 )
00235 {
00236         ASSERT(params, "null");
00237 
00238         smart_ptr<TPool> local = new TPool;
00239         ASSERT(local, "out of memory");
00240 
00241         local->initialize(params);
00242 
00243         return local;
00244 }
00245 
00246 
00247 
00248 };      
00249