00001 #include <megatree/hbase_storage.h>
00002
00003 #include <algorithm>
00004
00005 #define USE_FRAMED_TRANSPORT 0
00006
00007 namespace megatree {
00008
00009 using namespace apache::hadoop::hbase::thrift;
00010
00011 const static size_t ASYNC_READ_BATCH_SIZE = 10000;
00012
00013 const static size_t ASYNC_WRITE_BATCH_SIZE = 100;
00014 const static unsigned int NUM_READER_THREADS = 10;
00015 const static unsigned int NUM_WRITER_THREADS = 1;
00016
00017
00018
00019 void parseHbasePath(const boost::filesystem::path &path_,
00020 std::string &server, unsigned int &port, std::string &table)
00021 {
00022 std::string path = path_.string();
00023 assert(path.substr(0, 8) == std::string("hbase://"));
00024
00025
00026 size_t serverport_end = path.find('/', 8);
00027 std::string serverport = path.substr(8, serverport_end - 8);
00028
00029 size_t server_end = serverport.find(':', 0);
00030 if (server_end == std::string::npos) {
00031 server = serverport;
00032 port = 9090;
00033 }
00034 else {
00035 server = serverport.substr(0, server_end);
00036 std::string port_str = serverport.substr(server_end + 1);
00037 port = atoi(port_str.c_str());
00038 }
00039
00040
00041 table = path.substr(serverport_end + 1);
00042 while (table[table.size() - 1] == '/')
00043 table.resize(table.size() - 1);
00044 }
00045
00046 HbaseStorage::HbaseStorage(const boost::filesystem::path &_root)
00047 : async_thread_keep_running(true)
00048 {
00049 using namespace apache::thrift::transport;
00050 using namespace apache::thrift::protocol;
00051
00052 parseHbasePath(_root, server, port, table);
00053 printf("Connecting to hbase %s:%u, table %s\n", server.c_str(), port, table.c_str());
00054
00055
00056 socket.reset(new TSocket(server, port));
00057 #if USE_FRAMED_TRANSPORT
00058 transport.reset(new TFramedTransport(socket));
00059 #else
00060 transport.reset(new TBufferedTransport(socket));
00061 #endif
00062 protocol.reset(new TBinaryProtocol(transport));
00063 client.reset(new HbaseClient(protocol));
00064 transport->open();
00065
00066
00067 std::vector<std::string> table_names;
00068 client->getTableNames(table_names);
00069 std::vector<std::string>::iterator found = std::find(table_names.begin(), table_names.end(), table);
00070 if (found == table_names.end()) {
00071
00072 std::vector<ColumnDescriptor> column_families(1);
00073 column_families[0].name = "data";
00074 client->createTable(table, column_families);
00075 }
00076
00077
00078 async_thread_keep_running = true;
00079
00080 async_threads.resize(NUM_READER_THREADS + NUM_WRITER_THREADS);
00081 for(size_t i = 0; i < NUM_READER_THREADS; ++i)
00082 async_threads[i].reset(new boost::thread(boost::bind(&HbaseStorage::asyncReadThread, this)));
00083
00084 for(size_t i = 0; i < NUM_WRITER_THREADS; ++i)
00085 async_threads[i + NUM_READER_THREADS].reset(new boost::thread(boost::bind(&HbaseStorage::asyncWriteThread, this)));
00086 }
00087
00088 HbaseStorage::~HbaseStorage()
00089 {
00090
00091 async_thread_keep_running = false;
00092 write_queue_cond.notify_all();
00093 read_queue_cond.notify_all();
00094
00095
00096
00097
00098
00099
00100
00101 {
00102
00103
00104
00105 boost::mutex::scoped_lock writer_lock(write_queue_mutex);
00106 boost::mutex::scoped_lock reader_lock(read_queue_mutex);
00107 for(size_t i = 0; i < async_threads.size(); ++i) {
00108 async_threads[i]->interrupt();
00109 }
00110 }
00111 for(size_t i = 0; i < async_threads.size(); ++i) {
00112 if (!async_threads[i]->timed_join(boost::posix_time::seconds(10))) {
00113 fprintf(stderr, "Difficulty joining boost thread %zu during HbaseStorage destruction\n", i);
00114 }
00115 async_threads[i]->join();
00116 }
00117
00118 transport->close();
00119 }
00120
00121
00122 void HbaseStorage::getBatch(const std::vector<boost::filesystem::path> &paths, std::vector<ByteVec> &results)
00123 {
00124 boost::mutex::scoped_lock lock(socket_mutex);
00125 getBatch(client, paths, results);
00126 }
00127
00128 void HbaseStorage::getBatch(boost::shared_ptr<apache::hadoop::hbase::thrift::HbaseClient> passed_client,
00129 const std::vector<boost::filesystem::path> &paths, std::vector<ByteVec> &results)
00130 {
00131 results.resize(paths.size());
00132
00133 std::vector<std::string> rows(paths.size());
00134 for (size_t i = 0; i < paths.size(); ++i)
00135 rows[i] = paths[i].string();
00136
00137
00138 std::vector<TRowResult> row_results;
00139
00140
00141 passed_client->getRows(row_results, table, rows);
00142
00143
00144
00145 if (row_results.empty()) {
00146 fprintf(stderr, "getBatch failed. Not sure what to do here\n");
00147 results.clear();
00148 return;
00149 }
00150 if (row_results.size() != rows.size())
00151 {
00152 fprintf(stderr, "Requested %zu rows, but received %zu\n", rows.size(), row_results.size());
00153 fprintf(stderr, "Rows requested:");
00154 for (size_t i = 0; i < rows.size(); ++i)
00155 fprintf(stderr, " %s", rows[i].c_str());
00156 fprintf(stderr, "\n");
00157 abort();
00158 }
00159
00160
00161 for (size_t i = 0; i < row_results.size(); ++i)
00162 {
00163
00164 std::map<std::string, TCell>::iterator it = row_results[i].columns.find("data:");
00165 if (it == row_results[i].columns.end()) {
00166 fprintf(stderr, "Results for row %s were not empty (%zu), but column \"data:\" was not found!\n",
00167 paths[i].string().c_str(), results[i].size());
00168 abort();
00169 }
00170
00171 results[i].resize(it->second.value.size());
00172 memcpy(&(results[i][0]), &(it->second.value[0]), it->second.value.size());
00173 }
00174 }
00175
00176 void HbaseStorage::putBatch(const std::vector<boost::filesystem::path> &paths, std::vector<ByteVec> &data)
00177 {
00178 boost::mutex::scoped_lock lock(socket_mutex);
00179 putBatch(client, paths, data);
00180 }
00181
00182 void HbaseStorage::putBatch(boost::shared_ptr<apache::hadoop::hbase::thrift::HbaseClient> passed_client,
00183 const std::vector<boost::filesystem::path> &paths, std::vector<ByteVec> &data)
00184 {
00185 assert(paths.size() == data.size());
00186
00187 std::vector<BatchMutation> bm(paths.size());
00188 for (size_t i = 0; i < paths.size(); ++i)
00189 {
00190 bm[i].row = paths[i].string();
00191 bm[i].mutations.resize(1);
00192 bm[i].mutations[0].column = "data:";
00193 bm[i].mutations[0].value.resize(data[i].size());
00194 memcpy(&(bm[i].mutations[0].value[0]), &(data[i][0]), data[i].size());
00195 }
00196
00197 passed_client->mutateRows(table, bm);
00198 }
00199
00200
00201 void HbaseStorage::getAsync(const boost::filesystem::path &path, GetCallback callback)
00202 {
00203
00204 boost::mutex::scoped_lock lock(read_queue_mutex);
00205 read_queue.push_back(ReadData(path, callback));
00206 read_queue_cond.notify_one();
00207 }
00208
00209 void HbaseStorage::putAsync(const boost::filesystem::path &path, const ByteVec& data, PutCallback callback)
00210 {
00211 boost::mutex::scoped_lock lock(write_queue_mutex);
00212 write_queue.push_back(WriteData(path, data, callback));
00213 write_queue_cond.notify_one();
00214 }
00215
00216
00217 void HbaseStorage::asyncReadThread()
00218 {
00219 boost::shared_ptr<apache::thrift::transport::TSocket> thread_socket;
00220 boost::shared_ptr<apache::thrift::transport::TTransport> thread_transport;
00221 boost::shared_ptr<apache::thrift::protocol::TProtocol> thread_protocol;
00222 boost::shared_ptr<apache::hadoop::hbase::thrift::HbaseClient> thread_client;
00223
00224 using namespace apache::thrift::transport;
00225 using namespace apache::thrift::protocol;
00226
00227
00228 thread_socket.reset(new TSocket(server, port));
00229 #if USE_FRAMED_TRANSPORT
00230 thread_transport.reset(new TFramedTransport(thread_socket));
00231 #else
00232 thread_transport.reset(new TBufferedTransport(thread_socket));
00233 #endif
00234 thread_protocol.reset(new TBinaryProtocol(thread_transport));
00235 thread_client.reset(new HbaseClient(thread_protocol));
00236 thread_transport->open();
00237
00238 while (async_thread_keep_running)
00239 {
00240 std::vector<boost::filesystem::path> paths;
00241 std::vector<GetCallback> callbacks;
00242
00243 {
00244
00245 boost::mutex::scoped_lock lock(read_queue_mutex);
00246 while (read_queue.empty())
00247 read_queue_cond.wait(lock);
00248
00249
00250 for (size_t i = 0; i < ASYNC_READ_BATCH_SIZE && !read_queue.empty(); ++i)
00251 {
00252 paths.push_back(read_queue.front().path);
00253 callbacks.push_back(read_queue.front().callback);
00254 read_queue.pop_front();
00255 }
00256 }
00257
00258
00259 std::vector<ByteVec> results;
00260 getBatch(thread_client, paths, results);
00261
00262
00263 for (size_t i = 0; i < results.size(); ++i)
00264 {
00265 callbacks[i](results[i]);
00266 }
00267 }
00268 }
00269
00270
00271 void HbaseStorage::asyncWriteThread()
00272 {
00273 while (async_thread_keep_running)
00274 {
00275 std::vector<boost::filesystem::path> paths;
00276 std::vector<ByteVec> data;
00277 std::vector<PutCallback> callbacks;
00278
00279 {
00280
00281 boost::mutex::scoped_lock lock(write_queue_mutex);
00282 while (write_queue.empty())
00283 write_queue_cond.wait(lock);
00284
00285
00286 for (size_t i = 0; i < ASYNC_WRITE_BATCH_SIZE && !write_queue.empty(); ++i)
00287 {
00288 paths.push_back(write_queue.front().path);
00289 data.push_back(write_queue.front().data);
00290 callbacks.push_back(write_queue.front().callback);
00291 write_queue.pop_front();
00292 }
00293 }
00294
00295
00296 putBatch(paths, data);
00297
00298
00299 for (size_t i = 0; i < callbacks.size(); ++i)
00300 {
00301 callbacks[i]();
00302 }
00303 }
00304 }
00305
00306
00307 bool HbaseStorage::readFile(const std::string& row, std::string& buffer)
00308 {
00309 boost::mutex::scoped_lock lock(socket_mutex);
00310
00311
00312 std::vector<TRowResult> results;
00313 client->getRow(results, table, row);
00314
00315 if (results.size() == 0)
00316 return false;
00317
00318
00319 std::map<std::string, TCell>::iterator it = results[0].columns.find("data:");
00320 if (it == results[0].columns.end()) {
00321 fprintf(stderr, "Results for row %s were not empty (%zu), but column \"data:\" was not found!\n",
00322 row.c_str(), results.size());
00323 abort();
00324 }
00325 std::string &row_data = it->second.value;
00326 buffer.swap(row_data);
00327 return true;
00328 }
00329
00330 void HbaseStorage::writeFile(const std::string& row, const std::vector<unsigned char>& buffer)
00331 {
00332 boost::mutex::scoped_lock lock(socket_mutex);
00333
00334 std::vector<Mutation> m(1);
00335 m[0].column = "data:";
00336 m[0].value.resize(buffer.size());
00337 memcpy(&(m[0].value[0]), &buffer[0], buffer.size());
00338
00339 client->mutateRow(table, row, m);
00340 }
00341
00342
00343
00344
00345 void removeHbasePath(const boost::filesystem::path &path)
00346 {
00347 using namespace apache::thrift::transport;
00348 using namespace apache::thrift::protocol;
00349
00350
00351 std::string server;
00352 unsigned int port;
00353 std::string table;
00354 parseHbasePath(path, server, port, table);
00355
00356
00357 boost::shared_ptr<TSocket> socket(new TSocket(server, port));
00358 #if USE_FRAMED_TRANSPORT
00359 boost::shared_ptr<TTransport> transport(new TFramedTransport(socket));
00360 #else
00361 boost::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
00362 #endif
00363 boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
00364 apache::hadoop::hbase::thrift::HbaseClient client(protocol);
00365 transport->open();
00366
00367
00368 try {
00369 client.disableTable(table);
00370 client.deleteTable(table);
00371 }
00372 catch (const apache::hadoop::hbase::thrift::IOError &ex) {
00373
00374 fprintf(stderr, "Warning: Exception while deleting table %s: %s\n", table.c_str(), ex.what());
00375 }
00376 transport->close();
00377 }
00378
00379 }