Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040 #include "threadpool.h"
00041
00042 #include "common/wave_ex.h"
00043 #include "datahash/datahash_util.h"
00044 #include "perf/perf.h"
00045 #include "threadsafe/smart_mutex.h"
00046 #include "threadsafe/threadsafe.h"
00047
00048
00049 namespace threadpool {
00050
00051
00052
00053 Pool::~Pool(void) throw() { }
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069 class TPool : public Pool {
00070 public:
00071 TPool(void) throw();
00072 ~TPool(void) throw() { }
00073
00074
00075 void initialize(IN const Datahash * params);
00076
00077
00078 bool addRequest(IN thread_work_fn work_fn,
00079 IN void * context);
00080
00081 private:
00082
00083 struct request_t {
00084 request_t(void) { bzero(this, sizeof(request_t)); }
00085
00086 thread_work_fn fn;
00087 void * context;
00088 TPool * pThis;
00089 };
00090
00091
00092 void updateThreadCount(IN int increment) throw();
00093 static void * poolThreadStart(IN void * ctx);
00094
00095
00096 smart_mutex m_mutex;
00097 int m_maxThreadCount;
00098 int m_threadCount;
00099 };
00100
00101
00102
00103 TPool::TPool
00104 (
00105 void
00106 )
00107 throw()
00108 {
00109 m_maxThreadCount = 0;
00110 m_threadCount = 0;
00111 }
00112
00113
00114
00115 void
00116 TPool::initialize
00117 (
00118 IN const Datahash * params
00119 )
00120 {
00121 ASSERT(params, "null");
00122
00123 m_maxThreadCount = getInt(params, "maxThreads");
00124 ASSERT(m_maxThreadCount > 0, "Bad max thread count: %d",
00125 m_maxThreadCount);
00126 }
00127
00128
00129
00130 bool
00131 TPool::addRequest
00132 (
00133 IN thread_work_fn fn,
00134 IN void * context
00135 )
00136 {
00137 perf::Timer timer("threadpool::addRequest");
00138 ASSERT(fn, "null");
00139
00140
00141
00142
00143
00144 smart_ptr<request_t> req = new request_t;
00145 ASSERT(req, "out of memory");
00146
00147 req->fn = fn;
00148 req->context = context;
00149 req->pThis = this;
00150
00151 thread_id_t thread_id_unused;
00152 ASSERT_THROW(!createThread(poolThreadStart, req, thread_id_unused),
00153 "Failed to create thread for threadpool?");
00154 req.disown();
00155
00156
00157 return true;
00158 }
00159
00160
00161
00162 void *
00163 TPool::poolThreadStart
00164 (
00165 IN void * ctx
00166 )
00167 {
00168 request_t * preq = (request_t *) ctx;
00169 ASSERT(preq, "null thread context?");
00170 smart_ptr<request_t> req = preq;
00171 ASSERT(req, "failed to assign smart pointer?");
00172 ASSERT(req->pThis, "null");
00173 ASSERT(req->fn, "null thread work function");
00174
00175
00176
00177 req->pThis->updateThreadCount(+1);
00178
00179
00180 try {
00181 req->fn(req->context);
00182 } catch (std::exception& e) {
00183 DPRINTF("Exception in thread pool while handling request: %s",
00184 e.what());
00185 } catch (...) {
00186 DPRINTF("Unknown exception handling thread pool request!");
00187 }
00188
00189
00190 req->pThis->updateThreadCount(-1);
00191
00192
00193 return NULL;
00194 }
00195
00196
00197
00198 void
00199 TPool::updateThreadCount
00200 (
00201 IN int increment
00202 )
00203 throw()
00204 {
00205 int count = 0;
00206 {
00207 mlock lock(m_mutex);
00208 m_threadCount += increment;
00209 count = m_threadCount;
00210 }
00211
00212 if (count < 0) {
00213 DPRINTF("ERROR! Negative thread count? %d", count);
00214 }
00215 if (count > m_maxThreadCount) {
00216 DPRINTF("WARNING! Thread count is higher than requested size");
00217 DPRINTF(" Requested %d threads, have %d currently",
00218 m_maxThreadCount, count);
00219 }
00220 }
00221
00222
00223
00224
00225
00226
00227
00228
00229
00230 smart_ptr<Pool>
00231 Pool::create
00232 (
00233 IN const Datahash * params
00234 )
00235 {
00236 ASSERT(params, "null");
00237
00238 smart_ptr<TPool> local = new TPool;
00239 ASSERT(local, "out of memory");
00240
00241 local->initialize(params);
00242
00243 return local;
00244 }
00245
00246
00247
00248 };
00249