netrq.cpp

Go to the documentation of this file.
00001 /*
00002  * netrq.cpp
00003  *
00004  * Copyright (c) 2009 Thomas A. Vaughan
00005  * All rights reserved.
00006  *
00007  *
00008  * Redistribution and use in source and binary forms, with or without
00009  * modification, are permitted provided that the following conditions are met:
00010  *     * Redistributions of source code must retain the above copyright
00011  *       notice, this list of conditions and the following disclaimer.
00012  *     * Redistributions in binary form must reproduce the above copyright
00013  *       notice, this list of conditions and the following disclaimer in the
00014  *       documentation and/or other materials provided with the distribution.
00015  *     * Neither the name of the <organization> nor the
00016  *       names of its contributors may be used to endorse or promote products
00017  *       derived from this software without specific prior written permission.
00018  *
00019  * THIS SOFTWARE IS PROVIDED BY THOMAS A. VAUGHAN ''AS IS'' AND ANY
00020  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
00021  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
00022  * DISCLAIMED. IN NO EVENT SHALL THOMAS A. VAUGHAN BE LIABLE FOR ANY
00023  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
00024  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00025  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
00026  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00027  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
00028  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00029  *
00030  * Support for network request queues.  See netrq.h
00031  */
00032 
00033 // includes --------------------------------------------------------------------
00034 #include "netrq.h"              // always include our own header first
00035 
00036 #include <list>
00037 
00038 #include "common/wave_ex.h"
00039 #include "perf/perf.h"
00040 #include "threadsafe/smart_mutex.h"
00041 
00042 
00043 namespace netrq {
00044 
00045 /// \ingroup netrq
00046 /*@{*/
00047 
00048 
00049 // interface destructors
00050 Request::~Request(void) throw() { }
00051 Queue::~Queue(void) throw() { }
00052 
00053 
00054 
00055 ////////////////////////////////////////////////////////////////////////////////
00056 //
00057 //      static helper methods
00058 //
00059 ////////////////////////////////////////////////////////////////////////////////
00060 
00061 
00062 ////////////////////////////////////////////////////////////////////////////////
00063 //
00064 //      NetRQ -- class that implements the Queue interface
00065 //
00066 ////////////////////////////////////////////////////////////////////////////////
00067 
00068 class NetRQ : public Queue {
00069 public:
00070         // constructor, destructor ---------------------------------------------
00071         NetRQ(void) throw() { }
00072         ~NetRQ(void) throw() { }
00073 
00074         // public class methods ------------------------------------------------
00075         void initialize(void) { }
00076 
00077         // netrq::Queue class interface methods --------------------------------
00078         int size(void);
00079         eInsertResult addRequest(IN dword_t clock,
00080                                 IN smart_ptr<Request>& request);
00081         bool getNextRequest(IN dword_t clock,
00082                                 OUT smart_ptr<Request>& request);
00083         bool containsRequest(IN const char * id);
00084         bool removeRequest(IN const char * id);
00085 
00086 private:
00087         // private typedefs ----------------------------------------------------
00088         struct request_data_t {
00089                 // constructor, manipulators
00090                 request_data_t(void) throw() { this->clear(); }
00091                 void clear(void) throw() {
00092                                 request = NULL;
00093                                 clock = 0;
00094                         }
00095 
00096                 // data fields
00097                 smart_ptr<Request>              request;
00098                 dword_t                         clock;
00099         };
00100 
00101 
00102         typedef std::map<std::string, smart_ptr<request_data_t> > id_map_t;
00103         typedef std::list<smart_ptr<request_data_t> > req_queue_t;
00104         typedef std::map<dword_t, smart_ptr<req_queue_t> > clock_map_t;
00105 
00106         // private helper methods ----------------------------------------------
00107         smart_ptr<request_data_t> getRequestById(IN const char * id);
00108         req_queue_t * getQueueForClock(IN dword_t clock);
00109 
00110         // private data members ------------------------------------------------
00111         id_map_t                m_idMap;        // look up requests by id
00112         clock_map_t             m_clockMap;     // requests sorted by clock
00113         smart_mutex             m_mutex;
00114 };
00115 
00116 
00117 
00118 ////////////////////////////////////////////////////////////////////////////////
00119 //
00120 //      NetRQ -- netrq::Queue class interface methods
00121 //
00122 ////////////////////////////////////////////////////////////////////////////////
00123 
00124 int
00125 NetRQ::size
00126 (
00127 void
00128 )
00129 {
00130         // lock!
00131         mlock l(m_mutex);
00132 
00133         // assume id map and clock map are in sync!
00134         return m_idMap.size();
00135 }
00136 
00137 
00138 
00139 NetRQ::eInsertResult
00140 NetRQ::addRequest
00141 (
00142 IN dword_t clock,
00143 IN smart_ptr<Request>& request
00144 )
00145 {
00146         ASSERT(clock > 0, "bad clock value: %lu", (long) clock);
00147         ASSERT(request, "null");
00148 
00149         const char * id = request->getId();
00150         ASSERT(id, "null ID from request?");
00151 
00152         // lock!
00153         mlock l(m_mutex);
00154         smart_ptr<request_data_t> prd = this->getRequestById(id);
00155         if (prd) {
00156                 // already have this request!
00157                 if (clock < prd->clock) {
00158                         return eQueue_Failed_CurrentEntryHasLaterClock;
00159                 } else {
00160                         return eQueue_Failed_CurrentEntryHasEarlierClock;
00161                 }
00162         }
00163 
00164         // allocate request data structure
00165         prd = new request_data_t;
00166         ASSERT(prd, "out of memory");
00167         prd->clock = clock;
00168         prd->request = request;
00169 
00170         // look up the current clock, and add a queue if necessary
00171         req_queue_t * prq = this->getQueueForClock(clock);
00172         ASSERT(prq, "null");
00173 
00174         prq->push_back(prd);
00175         m_idMap[id] = prd;
00176         ASSERT(prd.get_ref_count() > 1, "bad ref count");
00177 
00178         // okay!
00179         return eQueue_Success;
00180 }
00181 
00182 
00183 
00184 bool
00185 NetRQ::getNextRequest
00186 (
00187 IN dword_t clock,
00188 OUT smart_ptr<Request>& request
00189 )
00190 {
00191         ASSERT(clock > 0, "Bad clock: %lu", (long) clock);
00192         request = NULL;
00193 
00194         // lock!
00195         mlock l(m_mutex);
00196 
00197         // get whatever is at the head of the clock queue
00198         clock_map_t::iterator head = m_clockMap.begin();
00199         if (m_clockMap.end() == head) {
00200                 return false;   // queue is empty!
00201         }
00202         if (head->first > clock) {
00203                 return false;   // next request can wait
00204         }
00205 
00206         // there is something in the map!
00207         req_queue_t * prq = head->second;       // sorry about the names...
00208         ASSERT(prq, "null queue in clock map?");
00209 
00210         req_queue_t::iterator i = prq->begin();
00211         ASSERT(prq->end() != i, "empty queue in clock map?");
00212 
00213         smart_ptr<request_data_t> prd = *i;
00214         ASSERT(prd, "null request data in queue");
00215 
00216         // give data to caller
00217         request = prd->request;
00218         return true;
00219 }
00220 
00221 
00222 
00223 bool
00224 NetRQ::containsRequest
00225 (
00226 IN const char * id
00227 )
00228 {
00229         ASSERT(id, "null");
00230 
00231         // see if we recognize this ID
00232         mlock l(m_mutex);
00233         id_map_t::iterator i = m_idMap.find(id);
00234         return (m_idMap.end() != i);
00235 }
00236 
00237 
00238 
00239 bool
00240 NetRQ::removeRequest
00241 (
00242 IN const char * id
00243 )
00244 {
00245         ASSERT(id, "null");
00246 
00247         // lock!
00248         mlock l(m_mutex);
00249 
00250         // do we even have this request?
00251         id_map_t::iterator iditer = m_idMap.find(id);
00252         if (m_idMap.end() == iditer) {
00253                 return false;           // don't know about this id!
00254         }
00255 
00256         // got it, so remove it
00257         request_data_t * prd = iditer->second;
00258         ASSERT(prd, "null request data in id map");
00259 
00260         // look for this clock
00261         clock_map_t::iterator ic = m_clockMap.find(prd->clock);
00262         ASSERT(m_clockMap.end() != ic,
00263             "request in ID map but no entry for clock");
00264         req_queue_t * prq = ic->second;
00265         ASSERT(prq, "null queue in clock map");
00266 
00267         // have to walk the queue...
00268         req_queue_t::iterator irq;
00269         for (irq = prq->begin(); irq != prq->end(); ++irq) {
00270                 request_data_t * iprd = *irq;
00271                 ASSERT(iprd, "null request data in queue");
00272                 ASSERT(iprd->request, "null");
00273                 if (!strcmp(iprd->request->getId(), id))
00274                         break;
00275         }
00276         ASSERT(prq->end() != irq,
00277             "request in ID map but not found in queue, id=%s", id);
00278         prq->erase(irq);
00279         if (!prq->size()) {
00280                 // queue is now empty, so delete this clock entry
00281                 m_clockMap.erase(ic);
00282         }
00283 
00284         // finally, nuke from ID map
00285         m_idMap.erase(iditer);
00286         return true;    // found it, nuked it
00287 }
00288 
00289 
00290 
00291 ////////////////////////////////////////////////////////////////////////////////
00292 //
00293 //      Public APIs
00294 //
00295 ////////////////////////////////////////////////////////////////////////////////
00296 
00297 smart_ptr<NetRQ::request_data_t>
00298 NetRQ::getRequestById
00299 (
00300 IN const char * id
00301 )
00302 {
00303         ASSERT(id, "null");
00304 
00305         // NOTE: this routine is private, and assumes the caller has locked!
00306         id_map_t::iterator i = m_idMap.find(id);
00307         if (m_idMap.end() == i)
00308                 return NULL;            // id not found
00309 
00310         smart_ptr<request_data_t> prd = i->second;
00311         ASSERT(prd, "null request data in id map");
00312         return prd;
00313 }
00314 
00315 
00316 
00317 NetRQ::req_queue_t *
00318 NetRQ::getQueueForClock
00319 (
00320 IN dword_t clock
00321 )
00322 {
00323         // NOTE: this routine is private, and assumes the caller has locked!
00324         clock_map_t::iterator i = m_clockMap.find(clock);
00325         if (m_clockMap.end() != i) {
00326                 // yes, we have a queue for this clock!
00327                 req_queue_t * prq = i->second;
00328                 ASSERT(prq, "null queue in clock map");
00329                 return prq;
00330         }
00331 
00332         // no queue found for clock!  create one
00333         smart_ptr<req_queue_t> rq = new req_queue_t;
00334         ASSERT(rq, "out of memory");
00335 
00336         // give the clock map a reference, and return to caller
00337         m_clockMap[clock] = rq;
00338         ASSERT(2 == rq.get_ref_count(), "bad ref count");
00339         return rq;
00340 }
00341 
00342 
00343 
00344 ////////////////////////////////////////////////////////////////////////////////
00345 //
00346 //      Public APIs
00347 //
00348 ////////////////////////////////////////////////////////////////////////////////
00349 
00350 smart_ptr<Queue>
00351 Queue::create
00352 (
00353 void
00354 )
00355 {
00356         smart_ptr<NetRQ> local = new NetRQ;
00357         ASSERT(local, "out of memory");
00358 
00359         local->initialize();
00360 
00361         return local;
00362 }
00363 
00364 
00365 
00366 int
00367 sendMessagesFromQueue
00368 (
00369 IO xdrbuf::Output * output,
00370 IN dword_t clock,
00371 IO Queue * queue
00372 )
00373 {
00374         ASSERT(output, "null");
00375         // ASSERT(clock) -- we don't care!
00376         ASSERT(queue, "null");
00377 
00378         // xdr seems to pad added buffers?
00379         const int xdrPadding = 8;
00380 
00381         // keep looping while we have messages and room
00382         int count = 0;
00383         smart_ptr<Request> request;
00384         while (queue->getNextRequest(clock, request)) {
00385                 ASSERT(request, "null");
00386                 int remain = output->getRemainingBytes();
00387         //      DPRINTF("%d bytes remaining in output buffer", remain);
00388                 int req = request->getMaxBytes();
00389                 if (remain - req < xdrPadding) {
00390                         break;          // can't add any more...
00391                 }
00392 
00393                 // okay, write this out
00394         //      int left = remain - req;
00395         //      DPRINTF("  Adding request: %s", request->getId());
00396         //      DPRINTF("    request bytes: %d (leaves %d?)", req, left);
00397                 request->write(output);
00398 
00399                 // remove request
00400                 queue->removeRequest(request->getId());
00401                 ++count;
00402 
00403                 // re-add?
00404                 int retry = request->getRetryInterval();
00405                 if (retry > 0)
00406                         queue->addRequest(clock + retry, request);
00407         }
00408 
00409         return count;
00410 }
00411 
00412 
00413 
00414 };      // netrq namespace
00415