Go to the documentation of this file.00001 #ifndef MEGATREE_HBASE_STORAGE_H_
00002 #define MEGATREE_HBASE_STORAGE_H_
00003
00004 #include <string>
00005 #include <iostream>
00006 #include <fstream>
00007 #include <boost/iostreams/device/mapped_file.hpp>
00008 #include <boost/thread.hpp>
00009 #include <boost/thread/mutex.hpp>
00010 #include <boost/thread/condition.hpp>
00011
00012 #include <megatree/storage.h>
00013
00014
00015 #include <stdint.h>
00016 #include <Hbase.h>
00017 #include <transport/TSocket.h>
00018 #include <transport/TBufferTransports.h>
00019 #include <protocol/TBinaryProtocol.h>
00020
00021
00022 namespace megatree {
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 class HbaseStorage : public Storage
00033 {
00034 public:
00035 HbaseStorage(const boost::filesystem::path &_root);
00036 virtual ~HbaseStorage();
00037
00038 virtual void getBatch(const std::vector<boost::filesystem::path> &paths, std::vector<ByteVec> &results);
00039 virtual void putBatch(const std::vector<boost::filesystem::path> &paths, std::vector<ByteVec> &data);
00040
00041 virtual void getAsync(const boost::filesystem::path &path, GetCallback callback);
00042 virtual void putAsync(const boost::filesystem::path &path, const ByteVec& data, PutCallback callback);
00043
00044 virtual std::string getType() {return std::string("HBaseStorage"); };
00045
00046 private:
00047 virtual void getBatch(boost::shared_ptr<apache::hadoop::hbase::thrift::HbaseClient> passed_client,
00048 const std::vector<boost::filesystem::path> &paths, std::vector<ByteVec> &results);
00049
00050 virtual void putBatch(boost::shared_ptr<apache::hadoop::hbase::thrift::HbaseClient> passed_client,
00051 const std::vector<boost::filesystem::path> &paths, std::vector<ByteVec> &data);
00052
00053
00054
00055 boost::mutex socket_mutex;
00056 boost::shared_ptr<apache::thrift::transport::TSocket> socket;
00057 boost::shared_ptr<apache::thrift::transport::TTransport> transport;
00058 boost::shared_ptr<apache::thrift::protocol::TProtocol> protocol;
00059 boost::shared_ptr<apache::hadoop::hbase::thrift::HbaseClient> client;
00060 std::string table;
00061
00062 std::string server;
00063 unsigned int port;
00064
00065
00066 bool async_thread_keep_running;
00067 std::vector<boost::shared_ptr<boost::thread> > async_threads;
00068 void asyncReadThread();
00069 void asyncWriteThread();
00070
00071
00072 struct ReadData
00073 {
00074 ReadData(const boost::filesystem::path& path_, const GetCallback& callback_)
00075 : path(path_), callback(callback_){}
00076 boost::filesystem::path path;
00077 GetCallback callback;
00078 };
00079 typedef std::deque<ReadData> ReadQueue;
00080 ReadQueue read_queue;
00081 boost::mutex read_queue_mutex;
00082 boost::condition read_queue_cond;
00083
00084
00085 struct WriteData
00086 {
00087 WriteData(const boost::filesystem::path& path_, const ByteVec& data_, const PutCallback& callback_)
00088 : path(path_), data(data_), callback(callback_){}
00089
00090 boost::filesystem::path path;
00091 ByteVec data;
00092 PutCallback callback;
00093 };
00094 typedef std::deque<WriteData> WriteQueue;
00095 WriteQueue write_queue;
00096 boost::mutex write_queue_mutex;
00097 boost::condition write_queue_cond;
00098
00099 void writeFile(const std::string& row, const std::vector<unsigned char>& buffer);
00100 bool readFile(const std::string& row, std::string& buffer);
00101 };
00102
00103
00104
00105 void removeHbasePath(const boost::filesystem::path &path);
00106
00107
00108 class HbaseTempDir : public TempDir
00109 {
00110 public:
00111 HbaseTempDir(const boost::filesystem::path &prefix = "", bool remove = true)
00112 : remove_(remove)
00113 {
00114 abort();
00115 std::string tmp_storage = prefix.string() + "XXXXXX";
00116 char *tmp = mkdtemp(&tmp_storage[0]);
00117 assert(tmp);
00118 printf("Temporary directory: %s\n", tmp);
00119
00120 path_ = tmp;
00121 }
00122
00123 ~HbaseTempDir()
00124 {
00125 if (remove_)
00126 boost::filesystem::remove_all(path_);
00127 }
00128
00129 const boost::filesystem::path &getPath() const
00130 {
00131 return path_;
00132 }
00133
00134 private:
00135 boost::filesystem::path path_;
00136 bool remove_;
00137 };
00138
00139
00140
00141 }
00142
00143 #endif