Defragment.h
Go to the documentation of this file.
00001 #ifndef CASTOR_NET_DEFRAGMENT_H
00002 #define CASTOR_NET_DEFRAGMENT_H 1
00003 
00004 /*
00005  * $Id: Defragment.h 1679 2006-11-16 15:42:23Z kb $
00006  *
00007  *
00008  * Copyright 2008 Carpe Noctem, Distributed Systems Group,
00009  * University of Kassel. All right reserved.
00010  *
00011  * The code is derived from the software contributed to Carpe Noctem by
00012  * the Carpe Noctem Team.
00013  *
00014  * The code is licensed under the Carpe Noctem Userfriendly BSD-Based
00015  * License (CNUBBL). Redistribution and use in source and binary forms,
00016  * with or without modification, are permitted provided that the
00017  * conditions of the CNUBBL are met.
00018  *
00019  * You should have received a copy of the CNUBBL along with this
00020  * software. The license is also available on our website:
00021  * http://carpenoctem.das-lab.net/license.txt
00022  *
00023  *
00024  * The Castor socket class
00025  */
00026 
00027 #include "Messages.h"
00028 #include "DateTime.h"
00029 
00030 #include <boost/shared_ptr.hpp>
00031 #include <boost/thread/mutex.hpp>
00032 #include <map>
00033 
00034 namespace castor { namespace net {
00035 
00036         template<class Type, class Decoder>
00037                 class Defragment {
00038 
00039                         protected:
00040 
00041                                 typedef boost::shared_ptr<Type> TypePtr;
00042                                 typedef boost::shared_ptr<Decoder> DecoderPtr;
00043 
00044                                 struct FragmentInfo {
00045                                    
00046                                         TypePtr fragment;
00047                                         unsigned long long timestamp;
00048            
00049                                         FragmentInfo(TypePtr fragment) :
00050                                                 fragment(fragment), timestamp(DateTime::getUtcNow().getTicks())
00051                                         {
00052                                         }
00053 
00054                                         FragmentInfo(const FragmentInfo &other) :
00055                                                 fragment(other.fragment), timestamp(other.timestamp)
00056                                         {
00057                                         }
00058                                 };
00059    
00060                                 struct KeyInfo {
00061  
00062                                         int fragId;
00063                                         NetAddress address;
00064  
00065                                         KeyInfo(int fragId, NetAddress address) :
00066                                                 fragId(fragId), address(address)
00067                                         {
00068                                         }
00069  
00070                                         KeyInfo(const KeyInfo &other) :
00071                                                 fragId(other.fragId), address(other.address)
00072                                         {
00073                                         }
00074  
00075                                         friend bool operator<(const KeyInfo &one, const KeyInfo &other) {
00076                                                 if (one.fragId < other.fragId) return true;
00077                                                 if (one.fragId != other.fragId) return false;
00078                                                 
00079                                                 return (one.address < other.address);
00080                                         }
00081                                 };
00082 
00083                                 typedef std::map<KeyInfo, FragmentInfo> FragmentMap;
00084 
00085                                 FragmentMap fragments;
00086                                 TypePtr reference;
00087 
00088                                 boost::mutex monitor;
00089                                 
00090                                 unsigned long long invPeriod;
00091                                 unsigned long long lastInvalidated;
00092 
00093                         public:
00094 
00095                                 Defragment(unsigned long long invPeriod) :
00096                                         fragments(), reference(TypePtr(new Type)),
00097                                         monitor(), invPeriod(invPeriod), lastInvalidated(0)
00098                                 {
00099                                 }
00100 
00101                                 TypePtr getMessage(const char *data, size_t offset, size_t count, const NetAddress &address) {
00102         
00103                                         invalidate();
00104  
00105                                         DecoderPtr dec = DecoderPtr(new Decoder(data, offset, count));
00106  
00107                                         int fragId = this->reference->getFragmentId(dec);
00108  
00109                                         // If the message is not fragmented, return it immediately
00110                                         if (fragId == -1) {
00111                                                 return boost::static_pointer_cast<Type>(this->reference->decodeAny(dec));
00112                                         }
00113                 
00114                                         // Otherwise, try to locate and reassemble it
00115                                         boost::mutex::scoped_lock lock(this->monitor);
00116         
00117                                         KeyInfo ki(fragId, address);
00118         
00119                                         // If there is a fragment with this id already, apply the new fragment
00120                                         typename FragmentMap::iterator itr = this->fragments.find(ki);
00121 
00122                                         if (itr != this->fragments.end()) {
00123 
00124                                                 TypePtr result = itr->second.fragment;
00125         
00126                                                 if (result->decodePart(dec)) {
00127         
00128                                                         // If the fragment was successfully reassembled, remove and return it
00129                                                         this->fragments.erase(itr);
00130         
00131                                                         return result;
00132                                                 }
00133         
00134                                         } else {
00135         
00136                                                 // Add a new fragment
00137                                                 TypePtr m = boost::static_pointer_cast<Type>(this->reference->decodeAnyPart(dec));
00138         
00139                                                 this->fragments.insert(std::pair<KeyInfo, FragmentInfo>(ki, FragmentInfo(m)));
00140                                         }
00141 
00142                                         return TypePtr();
00143                                 }
00144 
00145 
00146                         protected:
00147                                 
00148                                 void invalidate() {
00149                 
00150                                         unsigned long long now = DateTime::getUtcNow().getTicks();
00151                 
00152                                         if (((now - this->lastInvalidated) / 10000) > this->invPeriod) {
00153                 
00154                                                 boost::mutex::scoped_lock lock(this->monitor);
00155 
00156                                                 for (typename FragmentMap::iterator itr = this->fragments.begin();
00157                                                          itr != this->fragments.end(); itr++)
00158                                                 {
00159                                                         if (((now - itr->second.timestamp) / 10000) > this->invPeriod) {
00160 
00161                                                                 typename FragmentMap::iterator itrNext = itr;
00162                                                                 itrNext++;
00163                                                                 
00164                                                                 this->fragments.erase(itr);
00165                                                                 
00166                                                                 if (itrNext == this->fragments.end()) {
00167                                                                         break;
00168                                                                 }
00169 
00170                                                                 itr = itrNext;
00171                                                         }
00172                                                 }
00173                                         }
00174 
00175                                         this->lastInvalidated = now;
00176                                 }
00177                         };
00178 } }
00179 
00180 #endif /* CASTOR_SPICA_DEFRAGMENT_H */
00181 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines


Castor
Author(s): Carpe Noctem
autogenerated on Fri Nov 8 2013 11:05:39