hbase_storage.cpp
Go to the documentation of this file.
00001 #include <megatree/hbase_storage.h>
00002 
00003 #include <algorithm>
00004 
00005 #define USE_FRAMED_TRANSPORT 0
00006 
00007 namespace megatree {
00008 
00009 using namespace apache::hadoop::hbase::thrift;
00010 
00011 const static size_t ASYNC_READ_BATCH_SIZE = 10000;
00012 //const static size_t ASYNC_READ_BATCH_SIZE = 100;
00013 const static size_t ASYNC_WRITE_BATCH_SIZE = 100;
00014 const static unsigned int NUM_READER_THREADS = 10;
00015 const static unsigned int NUM_WRITER_THREADS = 1;
00016 
00017 //static size_t nodefiles_touched = 0;
00018 
00019 void parseHbasePath(const boost::filesystem::path &path_,
00020                     std::string &server, unsigned int &port, std::string &table)
00021 {
00022   std::string path = path_.string();
00023   assert(path.substr(0, 8) == std::string("hbase://"));
00024 
00025   // Separates off the server&port
00026   size_t serverport_end = path.find('/', 8);
00027   std::string serverport = path.substr(8, serverport_end - 8);
00028 
00029   size_t server_end = serverport.find(':', 0);
00030   if (server_end == std::string::npos) {
00031     server = serverport;
00032     port = 9090;
00033   }
00034   else {
00035     server = serverport.substr(0, server_end);
00036     std::string port_str = serverport.substr(server_end + 1);
00037     port = atoi(port_str.c_str());
00038   }
00039 
00040   // The rest is the table name.
00041   table = path.substr(serverport_end + 1);
00042   while (table[table.size() - 1] == '/')
00043     table.resize(table.size() - 1);
00044 }
00045 
00046 HbaseStorage::HbaseStorage(const boost::filesystem::path &_root)
00047   : async_thread_keep_running(true)
00048 {
00049   using namespace apache::thrift::transport;
00050   using namespace apache::thrift::protocol;
00051 
00052   parseHbasePath(_root, server, port, table);
00053   printf("Connecting to hbase %s:%u, table %s\n", server.c_str(), port, table.c_str());
00054 
00055   // Connects to Hbase.
00056   socket.reset(new TSocket(server, port));
00057 #if USE_FRAMED_TRANSPORT
00058   transport.reset(new TFramedTransport(socket));
00059 #else
00060   transport.reset(new TBufferedTransport(socket));
00061 #endif
00062   protocol.reset(new TBinaryProtocol(transport));
00063   client.reset(new HbaseClient(protocol));
00064   transport->open();
00065 
00066   // Checks if the table already exists.
00067   std::vector<std::string> table_names;
00068   client->getTableNames(table_names);
00069   std::vector<std::string>::iterator found = std::find(table_names.begin(), table_names.end(), table);
00070   if (found == table_names.end()) {
00071     // Creates the table
00072     std::vector<ColumnDescriptor> column_families(1);
00073     column_families[0].name = "data";
00074     client->createTable(table, column_families);
00075   }
00076 
00077   // Starts the async read/write threads
00078   async_thread_keep_running = true;
00079 
00080   async_threads.resize(NUM_READER_THREADS + NUM_WRITER_THREADS);
00081   for(size_t i = 0; i < NUM_READER_THREADS; ++i)
00082     async_threads[i].reset(new boost::thread(boost::bind(&HbaseStorage::asyncReadThread, this)));
00083 
00084   for(size_t i = 0; i < NUM_WRITER_THREADS; ++i)
00085     async_threads[i + NUM_READER_THREADS].reset(new boost::thread(boost::bind(&HbaseStorage::asyncWriteThread, this)));
00086 }
00087 
00088 HbaseStorage::~HbaseStorage()
00089 {
00090   // Stops the async thread
00091   async_thread_keep_running = false;
00092   write_queue_cond.notify_all();
00093   read_queue_cond.notify_all();
00094 
00095   // boost::thread has a bug in interrupt: interrupt does not always
00096   // affect conditions.  The workaround is to hold onto the mutex
00097   // while interrupting.
00098   //
00099   // See: https://svn.boost.org/trac/boost/ticket/2330
00100   // Workaround: https://svn.boost.org/trac/boost/ticket/3735
00101   {
00102     // TODO: Locking these mutices together is bad.  Should separate
00103     // threads into a thread group of readers and a thread group of
00104     // writers.
00105     boost::mutex::scoped_lock writer_lock(write_queue_mutex);
00106     boost::mutex::scoped_lock reader_lock(read_queue_mutex);
00107     for(size_t i = 0; i < async_threads.size(); ++i) {
00108       async_threads[i]->interrupt();
00109     }
00110   }
00111   for(size_t i = 0; i < async_threads.size(); ++i) {
00112     if (!async_threads[i]->timed_join(boost::posix_time::seconds(10))) {
00113       fprintf(stderr, "Difficulty joining boost thread %zu during HbaseStorage destruction\n", i);
00114     }
00115     async_threads[i]->join();
00116   }
00117   
00118   transport->close();
00119 }
00120 
00121 
00122 void HbaseStorage::getBatch(const std::vector<boost::filesystem::path> &paths, std::vector<ByteVec> &results)
00123 {
00124   boost::mutex::scoped_lock lock(socket_mutex);
00125   getBatch(client, paths, results);
00126 }
00127 
00128 void HbaseStorage::getBatch(boost::shared_ptr<apache::hadoop::hbase::thrift::HbaseClient> passed_client,
00129                             const std::vector<boost::filesystem::path> &paths, std::vector<ByteVec> &results)
00130 {
00131   results.resize(paths.size());
00132 
00133   std::vector<std::string> rows(paths.size());
00134   for (size_t i = 0; i < paths.size(); ++i)
00135     rows[i] = paths[i].string();
00136 
00137   // Gets the row
00138   std::vector<TRowResult> row_results;
00139   //boost::posix_time::ptime started = boost::posix_time::microsec_clock::universal_time();
00140   //printf("Making a get batch request\n");
00141   passed_client->getRows(row_results, table, rows);
00142   //boost::posix_time::ptime finished = boost::posix_time::microsec_clock::universal_time();
00143   //nodefiles_touched += paths.size();
00144   //printf("Get batch for size %zu, finished in %.4f seconds, nodefiles touched %zu\n", paths.size(), (finished - started).total_milliseconds() / 1000.0f, nodefiles_touched);
00145   if (row_results.empty()) {
00146     fprintf(stderr, "getBatch failed.  Not sure what to do here\n");
00147     results.clear();
00148     return;
00149   }
00150   if (row_results.size() != rows.size())
00151   {
00152     fprintf(stderr, "Requested %zu rows, but received %zu\n", rows.size(), row_results.size());
00153     fprintf(stderr, "Rows requested:");
00154     for (size_t i = 0; i < rows.size(); ++i)
00155       fprintf(stderr, " %s", rows[i].c_str());
00156     fprintf(stderr, "\n");
00157     abort();
00158   }
00159 
00160   // Gets the file data from each row result.
00161   for (size_t i = 0; i < row_results.size(); ++i)
00162   {
00163     // Finds the data: column
00164     std::map<std::string, TCell>::iterator it = row_results[i].columns.find("data:");
00165     if (it == row_results[i].columns.end()) {
00166       fprintf(stderr, "Results for row %s were not empty (%zu), but column \"data:\" was not found!\n",
00167               paths[i].string().c_str(), results[i].size());
00168       abort();
00169     }
00170 
00171     results[i].resize(it->second.value.size());
00172     memcpy(&(results[i][0]), &(it->second.value[0]), it->second.value.size());  // TODO: Removing this copy would be nice
00173   }
00174 }
00175 
00176 void HbaseStorage::putBatch(const std::vector<boost::filesystem::path> &paths, std::vector<ByteVec> &data)
00177 {
00178   boost::mutex::scoped_lock lock(socket_mutex);
00179   putBatch(client, paths, data);
00180 }
00181 
00182 void HbaseStorage::putBatch(boost::shared_ptr<apache::hadoop::hbase::thrift::HbaseClient> passed_client,
00183     const std::vector<boost::filesystem::path> &paths, std::vector<ByteVec> &data)
00184 {
00185   assert(paths.size() == data.size());
00186 
00187   std::vector<BatchMutation> bm(paths.size());
00188   for (size_t i = 0; i < paths.size(); ++i)
00189   {
00190     bm[i].row = paths[i].string();
00191     bm[i].mutations.resize(1);
00192     bm[i].mutations[0].column = "data:";
00193     bm[i].mutations[0].value.resize(data[i].size());
00194     memcpy(&(bm[i].mutations[0].value[0]), &(data[i][0]), data[i].size()); 
00195   }
00196   
00197   passed_client->mutateRows(table, bm);
00198 }
00199 
00200 
00201 void HbaseStorage::getAsync(const boost::filesystem::path &path, GetCallback callback)
00202 {
00203   // TODO: consider making this block when the read queue fills up
00204   boost::mutex::scoped_lock lock(read_queue_mutex);
00205   read_queue.push_back(ReadData(path, callback));
00206   read_queue_cond.notify_one();
00207 }
00208 
00209 void HbaseStorage::putAsync(const boost::filesystem::path &path, const ByteVec& data, PutCallback callback)
00210 {
00211   boost::mutex::scoped_lock lock(write_queue_mutex);
00212   write_queue.push_back(WriteData(path, data, callback));
00213   write_queue_cond.notify_one();
00214 }
00215 
00216 
00217 void HbaseStorage::asyncReadThread()
00218 {
00219   boost::shared_ptr<apache::thrift::transport::TSocket> thread_socket;
00220   boost::shared_ptr<apache::thrift::transport::TTransport> thread_transport;
00221   boost::shared_ptr<apache::thrift::protocol::TProtocol> thread_protocol;
00222   boost::shared_ptr<apache::hadoop::hbase::thrift::HbaseClient> thread_client;
00223 
00224   using namespace apache::thrift::transport;
00225   using namespace apache::thrift::protocol;
00226 
00227   // Connects to Hbase.
00228   thread_socket.reset(new TSocket(server, port));
00229 #if USE_FRAMED_TRANSPORT
00230   thread_transport.reset(new TFramedTransport(thread_socket));
00231 #else
00232   thread_transport.reset(new TBufferedTransport(thread_socket));
00233 #endif
00234   thread_protocol.reset(new TBinaryProtocol(thread_transport));
00235   thread_client.reset(new HbaseClient(thread_protocol));
00236   thread_transport->open();
00237 
00238   while (async_thread_keep_running)
00239   {
00240     std::vector<boost::filesystem::path> paths;
00241     std::vector<GetCallback> callbacks;
00242     
00243     {
00244       // Waits for the read queue to have something.
00245       boost::mutex::scoped_lock lock(read_queue_mutex);
00246       while (read_queue.empty())
00247         read_queue_cond.wait(lock);
00248 
00249       // Removes a batch of read requests from the queue
00250       for (size_t i = 0; i < ASYNC_READ_BATCH_SIZE && !read_queue.empty(); ++i)
00251       {
00252         paths.push_back(read_queue.front().path);
00253         callbacks.push_back(read_queue.front().callback);
00254         read_queue.pop_front();
00255       }
00256     }
00257 
00258     // Submits the batch of read requests
00259     std::vector<ByteVec> results;
00260     getBatch(thread_client, paths, results);
00261 
00262     // Triggers the callbacks
00263     for (size_t i = 0; i < results.size(); ++i)
00264     {
00265       callbacks[i](results[i]);
00266     }
00267   }
00268 }
00269 
00270 
00271 void HbaseStorage::asyncWriteThread()
00272 {
00273   while (async_thread_keep_running)
00274   {
00275     std::vector<boost::filesystem::path> paths;
00276     std::vector<ByteVec> data;
00277     std::vector<PutCallback> callbacks;
00278     
00279     {
00280       // Waits for the write queue to have something.
00281       boost::mutex::scoped_lock lock(write_queue_mutex);
00282       while (write_queue.empty())
00283         write_queue_cond.wait(lock);
00284 
00285       // Removes a batch of write requests from the queue
00286       for (size_t i = 0; i < ASYNC_WRITE_BATCH_SIZE && !write_queue.empty(); ++i)
00287       {
00288         paths.push_back(write_queue.front().path);
00289         data.push_back(write_queue.front().data);
00290         callbacks.push_back(write_queue.front().callback);
00291         write_queue.pop_front();
00292       }
00293     }
00294 
00295     // Submits the batch of write requests
00296     putBatch(paths, data);
00297 
00298     // Triggers the callbacks
00299     for (size_t i = 0; i < callbacks.size(); ++i)
00300     {
00301       callbacks[i]();
00302     }
00303   }
00304 }
00305 
00306 
00307 bool HbaseStorage::readFile(const std::string& row, std::string& buffer)
00308 {
00309   boost::mutex::scoped_lock lock(socket_mutex);
00310   
00311   // Attempts to get the row.
00312   std::vector<TRowResult> results;
00313   client->getRow(results, table, row);
00314 
00315   if (results.size() == 0)
00316     return false;
00317 
00318   // Gets the file data from the row query.
00319   std::map<std::string, TCell>::iterator it = results[0].columns.find("data:");
00320   if (it == results[0].columns.end()) {
00321     fprintf(stderr, "Results for row %s were not empty (%zu), but column \"data:\" was not found!\n",
00322             row.c_str(), results.size());
00323     abort();
00324   }
00325   std::string &row_data = it->second.value;
00326   buffer.swap(row_data);  // Does this cause a copy or not?
00327   return true;
00328 }
00329 
00330 void HbaseStorage::writeFile(const std::string& row, const std::vector<unsigned char>& buffer)
00331 {
00332   boost::mutex::scoped_lock lock(socket_mutex);
00333 
00334   std::vector<Mutation> m(1);
00335   m[0].column = "data:";
00336   m[0].value.resize(buffer.size());
00337   memcpy(&(m[0].value[0]), &buffer[0], buffer.size());
00338 
00339   client->mutateRow(table, row, m);
00340 }
00341 
00342 
00343 
00344 
00345 void removeHbasePath(const boost::filesystem::path &path)
00346 {
00347   using namespace apache::thrift::transport;
00348   using namespace apache::thrift::protocol;
00349 
00350   // Parses the path.
00351   std::string server;
00352   unsigned int port;
00353   std::string table;
00354   parseHbasePath(path, server, port, table);
00355 
00356   // Connects to Hbase
00357   boost::shared_ptr<TSocket> socket(new TSocket(server, port));
00358 #if USE_FRAMED_TRANSPORT
00359   boost::shared_ptr<TTransport> transport(new TFramedTransport(socket));
00360 #else
00361   boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
00362 #endif
00363   boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
00364   apache::hadoop::hbase::thrift::HbaseClient client(protocol);
00365   transport->open();
00366 
00367   // Destroys the table
00368   try {
00369     client.disableTable(table);
00370     client.deleteTable(table);
00371   }
00372   catch (const apache::hadoop::hbase::thrift::IOError &ex) {
00373     // The table already didn't exist.
00374     fprintf(stderr, "Warning: Exception while deleting table %s: %s\n", table.c_str(), ex.what());
00375   }
00376   transport->close();
00377 }
00378 
00379 }


megatree_storage
Author(s): Wim Meeussen
autogenerated on Thu Nov 28 2013 11:30:26