hbase_storage.h
Go to the documentation of this file.
00001 #ifndef MEGATREE_HBASE_STORAGE_H_
00002 #define MEGATREE_HBASE_STORAGE_H_
00003 
00004 #include <string>
00005 #include <iostream>
00006 #include <fstream>
00007 #include <boost/iostreams/device/mapped_file.hpp>
00008 #include <boost/thread.hpp>
00009 #include <boost/thread/mutex.hpp>
00010 #include <boost/thread/condition.hpp>
00011 
00012 #include <megatree/storage.h>
00013 
00014 // Thrift includes.  It may be beneficial to get these out of the header eventually
00015 #include <stdint.h>
00016 #include <Hbase.h>
00017 #include <transport/TSocket.h>
00018 #include <transport/TBufferTransports.h>
00019 #include <protocol/TBinaryProtocol.h>
00020 
00021 
00022 namespace megatree {
00023 
00024 // TODO: There will be big problems a an HbaseFile outlives the HbaseStorage that it was created from!
00025 
00026 
00027 /*
00028  * Path structure:  hbase://server:port/table/row/morerow/stillrow
00029  * (The rest of the path corresponds to the row).
00030  * The file data is stored in the "data:" column.
00031  */
00032 class HbaseStorage : public Storage
00033 {
00034 public:
00035   HbaseStorage(const boost::filesystem::path &_root);
00036   virtual ~HbaseStorage();
00037   
00038   virtual void getBatch(const std::vector<boost::filesystem::path> &paths, std::vector<ByteVec> &results);
00039   virtual void putBatch(const std::vector<boost::filesystem::path> &paths, std::vector<ByteVec> &data);
00040 
00041   virtual void getAsync(const boost::filesystem::path &path, GetCallback callback);
00042   virtual void putAsync(const boost::filesystem::path &path, const ByteVec& data, PutCallback callback);
00043   
00044   virtual std::string getType() {return std::string("HBaseStorage"); };
00045 
00046 private:
00047   virtual void getBatch(boost::shared_ptr<apache::hadoop::hbase::thrift::HbaseClient> passed_client,
00048                         const std::vector<boost::filesystem::path> &paths, std::vector<ByteVec> &results);
00049 
00050   virtual void putBatch(boost::shared_ptr<apache::hadoop::hbase::thrift::HbaseClient> passed_client, 
00051                         const std::vector<boost::filesystem::path> &paths, std::vector<ByteVec> &data);
00052   
00053 
00054   // Thrift/HBase connection
00055   boost::mutex socket_mutex;
00056   boost::shared_ptr<apache::thrift::transport::TSocket> socket;
00057   boost::shared_ptr<apache::thrift::transport::TTransport> transport;
00058   boost::shared_ptr<apache::thrift::protocol::TProtocol> protocol;
00059   boost::shared_ptr<apache::hadoop::hbase::thrift::HbaseClient> client;
00060   std::string table;
00061 
00062   std::string server;
00063   unsigned int port;
00064 
00065   // Asynchronous thread
00066   bool async_thread_keep_running;
00067   std::vector<boost::shared_ptr<boost::thread> > async_threads;
00068   void asyncReadThread();
00069   void asyncWriteThread();
00070   
00071   // Asynchronous reads
00072   struct ReadData
00073   {
00074     ReadData(const boost::filesystem::path& path_, const GetCallback& callback_)
00075       : path(path_), callback(callback_){}
00076     boost::filesystem::path path;
00077     GetCallback callback;
00078   };
00079   typedef std::deque<ReadData> ReadQueue;
00080   ReadQueue read_queue;  // Pop off the front
00081   boost::mutex read_queue_mutex;
00082   boost::condition read_queue_cond;
00083 
00084   // Asynchronous writes
00085   struct WriteData
00086   {
00087     WriteData(const boost::filesystem::path& path_, const ByteVec& data_, const PutCallback& callback_)
00088       : path(path_), data(data_), callback(callback_){}
00089 
00090     boost::filesystem::path path;
00091     ByteVec data;
00092     PutCallback callback;
00093   };
00094   typedef std::deque<WriteData> WriteQueue;
00095   WriteQueue write_queue;  // Pop off the front
00096   boost::mutex write_queue_mutex;
00097   boost::condition write_queue_cond;
00098 
00099   void writeFile(const std::string& row, const std::vector<unsigned char>& buffer);
00100   bool readFile(const std::string& row, std::string& buffer);
00101 };
00102 
00103 
00104 
00105 void removeHbasePath(const boost::filesystem::path &path);
00106 
00107 
00108 class HbaseTempDir : public TempDir
00109 {
00110 public:
00111   HbaseTempDir(const boost::filesystem::path &prefix = "", bool remove = true)
00112     : remove_(remove)
00113   {
00114     abort();
00115     std::string tmp_storage = prefix.string() + "XXXXXX";
00116     char *tmp = mkdtemp(&tmp_storage[0]);
00117     assert(tmp);
00118     printf("Temporary directory: %s\n", tmp);
00119 
00120     path_ = tmp;
00121   }
00122 
00123   ~HbaseTempDir()
00124   {
00125     if (remove_)
00126       boost::filesystem::remove_all(path_);
00127   }
00128 
00129   const boost::filesystem::path &getPath() const
00130   {
00131     return path_;
00132   }
00133 
00134 private:
00135   boost::filesystem::path path_;
00136   bool remove_;
00137 };
00138 
00139 
00140 
00141 }
00142 
00143 #endif


megatree_storage
Author(s): Wim Meeussen
autogenerated on Thu Nov 28 2013 11:30:26