Go to the documentation of this file.00001 #ifndef CASTOR_NET_DEFRAGMENT_H
00002 #define CASTOR_NET_DEFRAGMENT_H 1
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027 #include "Messages.h"
00028 #include "DateTime.h"
00029
00030 #include <boost/shared_ptr.hpp>
00031 #include <boost/thread/mutex.hpp>
00032 #include <map>
00033
00034 namespace castor { namespace net {
00035
00036 template<class Type, class Decoder>
00037 class Defragment {
00038
00039 protected:
00040
00041 typedef boost::shared_ptr<Type> TypePtr;
00042 typedef boost::shared_ptr<Decoder> DecoderPtr;
00043
00044 struct FragmentInfo {
00045
00046 TypePtr fragment;
00047 unsigned long long timestamp;
00048
00049 FragmentInfo(TypePtr fragment) :
00050 fragment(fragment), timestamp(DateTime::getUtcNow().getTicks())
00051 {
00052 }
00053
00054 FragmentInfo(const FragmentInfo &other) :
00055 fragment(other.fragment), timestamp(other.timestamp)
00056 {
00057 }
00058 };
00059
00060 struct KeyInfo {
00061
00062 int fragId;
00063 NetAddress address;
00064
00065 KeyInfo(int fragId, NetAddress address) :
00066 fragId(fragId), address(address)
00067 {
00068 }
00069
00070 KeyInfo(const KeyInfo &other) :
00071 fragId(other.fragId), address(other.address)
00072 {
00073 }
00074
00075 friend bool operator<(const KeyInfo &one, const KeyInfo &other) {
00076 if (one.fragId < other.fragId) return true;
00077 if (one.fragId != other.fragId) return false;
00078
00079 return (one.address < other.address);
00080 }
00081 };
00082
00083 typedef std::map<KeyInfo, FragmentInfo> FragmentMap;
00084
00085 FragmentMap fragments;
00086 TypePtr reference;
00087
00088 boost::mutex monitor;
00089
00090 unsigned long long invPeriod;
00091 unsigned long long lastInvalidated;
00092
00093 public:
00094
00095 Defragment(unsigned long long invPeriod) :
00096 fragments(), reference(TypePtr(new Type)),
00097 monitor(), invPeriod(invPeriod), lastInvalidated(0)
00098 {
00099 }
00100
00101 TypePtr getMessage(const char *data, size_t offset, size_t count, const NetAddress &address) {
00102
00103 invalidate();
00104
00105 DecoderPtr dec = DecoderPtr(new Decoder(data, offset, count));
00106
00107 int fragId = this->reference->getFragmentId(dec);
00108
00109
00110 if (fragId == -1) {
00111 return boost::static_pointer_cast<Type>(this->reference->decodeAny(dec));
00112 }
00113
00114
00115 boost::mutex::scoped_lock lock(this->monitor);
00116
00117 KeyInfo ki(fragId, address);
00118
00119
00120 typename FragmentMap::iterator itr = this->fragments.find(ki);
00121
00122 if (itr != this->fragments.end()) {
00123
00124 TypePtr result = itr->second.fragment;
00125
00126 if (result->decodePart(dec)) {
00127
00128
00129 this->fragments.erase(itr);
00130
00131 return result;
00132 }
00133
00134 } else {
00135
00136
00137 TypePtr m = boost::static_pointer_cast<Type>(this->reference->decodeAnyPart(dec));
00138
00139 this->fragments.insert(std::pair<KeyInfo, FragmentInfo>(ki, FragmentInfo(m)));
00140 }
00141
00142 return TypePtr();
00143 }
00144
00145
00146 protected:
00147
00148 void invalidate() {
00149
00150 unsigned long long now = DateTime::getUtcNow().getTicks();
00151
00152 if (((now - this->lastInvalidated) / 10000) > this->invPeriod) {
00153
00154 boost::mutex::scoped_lock lock(this->monitor);
00155
00156 for (typename FragmentMap::iterator itr = this->fragments.begin();
00157 itr != this->fragments.end(); itr++)
00158 {
00159 if (((now - itr->second.timestamp) / 10000) > this->invPeriod) {
00160
00161 typename FragmentMap::iterator itrNext = itr;
00162 itrNext++;
00163
00164 this->fragments.erase(itr);
00165
00166 if (itrNext == this->fragments.end()) {
00167 break;
00168 }
00169
00170 itr = itrNext;
00171 }
00172 }
00173 }
00174
00175 this->lastInvalidated = now;
00176 }
00177 };
00178 } }
00179
00180 #endif
00181