$search
00001 #ifndef CASTOR_NET_DEFRAGMENT_H 00002 #define CASTOR_NET_DEFRAGMENT_H 1 00003 00004 /* 00005 * $Id: Defragment.h 1679 2006-11-16 15:42:23Z kb $ 00006 * 00007 * 00008 * Copyright 2008 Carpe Noctem, Distributed Systems Group, 00009 * University of Kassel. All right reserved. 00010 * 00011 * The code is derived from the software contributed to Carpe Noctem by 00012 * the Carpe Noctem Team. 00013 * 00014 * The code is licensed under the Carpe Noctem Userfriendly BSD-Based 00015 * License (CNUBBL). Redistribution and use in source and binary forms, 00016 * with or without modification, are permitted provided that the 00017 * conditions of the CNUBBL are met. 00018 * 00019 * You should have received a copy of the CNUBBL along with this 00020 * software. The license is also available on our website: 00021 * http://carpenoctem.das-lab.net/license.txt 00022 * 00023 * 00024 * The Castor socket class 00025 */ 00026 00027 #include "Messages.h" 00028 #include "DateTime.h" 00029 00030 #include <boost/shared_ptr.hpp> 00031 #include <boost/thread/mutex.hpp> 00032 #include <map> 00033 00034 namespace castor { namespace net { 00035 00036 template<class Type, class Decoder> 00037 class Defragment { 00038 00039 protected: 00040 00041 typedef boost::shared_ptr<Type> TypePtr; 00042 typedef boost::shared_ptr<Decoder> DecoderPtr; 00043 00044 struct FragmentInfo { 00045 00046 TypePtr fragment; 00047 unsigned long long timestamp; 00048 00049 FragmentInfo(TypePtr fragment) : 00050 fragment(fragment), timestamp(DateTime::getUtcNow().getTicks()) 00051 { 00052 } 00053 00054 FragmentInfo(const FragmentInfo &other) : 00055 fragment(other.fragment), timestamp(other.timestamp) 00056 { 00057 } 00058 }; 00059 00060 struct KeyInfo { 00061 00062 int fragId; 00063 NetAddress address; 00064 00065 KeyInfo(int fragId, NetAddress address) : 00066 fragId(fragId), address(address) 00067 { 00068 } 00069 00070 KeyInfo(const KeyInfo &other) : 00071 fragId(other.fragId), address(other.address) 00072 { 00073 } 00074 00075 friend bool operator<(const KeyInfo &one, const KeyInfo &other) { 00076 if (one.fragId < other.fragId) return true; 00077 if (one.fragId != other.fragId) return false; 00078 00079 return (one.address < other.address); 00080 } 00081 }; 00082 00083 typedef std::map<KeyInfo, FragmentInfo> FragmentMap; 00084 00085 FragmentMap fragments; 00086 TypePtr reference; 00087 00088 boost::mutex monitor; 00089 00090 unsigned long long invPeriod; 00091 unsigned long long lastInvalidated; 00092 00093 public: 00094 00095 Defragment(unsigned long long invPeriod) : 00096 fragments(), reference(TypePtr(new Type)), 00097 monitor(), invPeriod(invPeriod), lastInvalidated(0) 00098 { 00099 } 00100 00101 TypePtr getMessage(const char *data, size_t offset, size_t count, const NetAddress &address) { 00102 00103 invalidate(); 00104 00105 DecoderPtr dec = DecoderPtr(new Decoder(data, offset, count)); 00106 00107 int fragId = this->reference->getFragmentId(dec); 00108 00109 // If the message is not fragmented, return it immediately 00110 if (fragId == -1) { 00111 return boost::static_pointer_cast<Type>(this->reference->decodeAny(dec)); 00112 } 00113 00114 // Otherwise, try to locate and reassemble it 00115 boost::mutex::scoped_lock lock(this->monitor); 00116 00117 KeyInfo ki(fragId, address); 00118 00119 // If there is a fragment with this id already, apply the new fragment 00120 typename FragmentMap::iterator itr = this->fragments.find(ki); 00121 00122 if (itr != this->fragments.end()) { 00123 00124 TypePtr result = itr->second.fragment; 00125 00126 if (result->decodePart(dec)) { 00127 00128 // If the fragment was successfully reassembled, remove and return it 00129 this->fragments.erase(itr); 00130 00131 return result; 00132 } 00133 00134 } else { 00135 00136 // Add a new fragment 00137 TypePtr m = boost::static_pointer_cast<Type>(this->reference->decodeAnyPart(dec)); 00138 00139 this->fragments.insert(std::pair<KeyInfo, FragmentInfo>(ki, FragmentInfo(m))); 00140 } 00141 00142 return TypePtr(); 00143 } 00144 00145 00146 protected: 00147 00148 void invalidate() { 00149 00150 unsigned long long now = DateTime::getUtcNow().getTicks(); 00151 00152 if (((now - this->lastInvalidated) / 10000) > this->invPeriod) { 00153 00154 boost::mutex::scoped_lock lock(this->monitor); 00155 00156 for (typename FragmentMap::iterator itr = this->fragments.begin(); 00157 itr != this->fragments.end(); itr++) 00158 { 00159 if (((now - itr->second.timestamp) / 10000) > this->invPeriod) { 00160 00161 typename FragmentMap::iterator itrNext = itr; 00162 itrNext++; 00163 00164 this->fragments.erase(itr); 00165 00166 if (itrNext == this->fragments.end()) { 00167 break; 00168 } 00169 00170 itr = itrNext; 00171 } 00172 } 00173 } 00174 00175 this->lastInvalidated = now; 00176 } 00177 }; 00178 } } 00179 00180 #endif /* CASTOR_SPICA_DEFRAGMENT_H */ 00181