streaming_server.cpp
Go to the documentation of this file.
00001 #include <cstdio>
00002 #include <deque>
00003 
00004 #include <boost/thread.hpp>
00005 #include <boost/thread/mutex.hpp>
00006 #include <boost/thread/condition.hpp>
00007 
00008 #include <boost/asio.hpp>
00009 
00010 #include <megatree/blocking_queue.h>
00011 
00012 // Threads:
00013 // A. Request reader thread (from client)
00014 // B. Response writer thread (to client)
00015 // C. Thrift thread
00016 
00017 using boost::asio::ip::tcp;
00018 
00019 typedef std::vector<uint8_t> Message;
00020 
00021 class ClientHandler
00022 {
00023 public:
00024   ClientHandler(boost::asio::io_service &io_service) : socket(io_service)
00025   {
00026   }
00027   
00028   ~ClientHandler()
00029   {
00030     stop();
00031     reader_thread.join();
00032     writer_thread.join();
00033   }
00034 
00035   void start()
00036   {
00037     keep_running = true;
00038     reader_thread = boost::thread(boost::bind(&ClientHandler::readerThread, this));
00039     writer_thread = boost::thread(boost::bind(&ClientHandler::writerThread, this));
00040   }
00041   
00042   void stop()
00043   {
00044     keep_running = false;
00045     reader_thread.interrupt();
00046     writer_thread.interrupt();
00047   }
00048 
00049   void waitUntilDone()
00050   {
00051     while (keep_running)
00052       usleep(100);
00053     reader_thread.join();
00054     writer_thread.join();
00055   }
00056 
00057   // Next:
00058   // - Open up a storage (from a command line parameter, sent directly to the storage factory)
00059   // - Create the storage thread, that:
00060   //   - Pops incoming queue
00061   //   - Calls get (or getBatch) on storage (put???)
00062   //   - Pushes the results onto outgoing queue
00063   //
00064   // Later: Create StreamingStorage, which connects to the streaming server
00065   // Even later: Create a "storage tester" which checks that the storage works properly
00066 
00067   void readerThread()
00068   {
00069     while (keep_running)
00070     {
00071       // Reads the message size
00072       std::vector<uint8_t> header(4);
00073       boost::system::error_code error;
00074       size_t ret;
00075       ret = boost::asio::read(socket, boost::asio::buffer(header, 4),
00076                               boost::asio::transfer_all(), error);
00077       if (error) {
00078         fprintf(stderr, "Error in reader thread: %s\n", error.message().c_str());
00079         stop();
00080       }
00081       assert(ret == 4);
00082       
00083       // Little endian
00084       uint32_t message_len = (header[0]) + (header[1] << 1) + (header[2] << 2) + (header[3] << 3);
00085       Message message(message_len - 4);
00086 
00087       // Reads in the rest of the message
00088       ret = boost::asio::read(socket, boost::asio::buffer(message, message.size()),
00089                               boost::asio::transfer_all(), error);
00090       if (error) {
00091         fprintf(stderr, "Error in reader thread: %s\n", error.message().c_str());
00092         stop();
00093       }
00094       assert(ret == message.size());
00095 
00096       // Enqueues the received message
00097       size_t current_size = incoming_queue.enqueue(message);
00098       printf("Received.  Incoming queue: %zu\n", current_size);
00099     }
00100   }
00101 
00102   void writerThread()
00103   {
00104     while (keep_running)
00105     {
00106       size_t ret;
00107       Message message = outgoing_queue.dequeue();
00108 
00109       uint32_t len = message.size() + 4;
00110       std::vector<uint8_t> header(4);
00111       header[0] = len & 0xff;
00112       header[1] = (len >> 1) & 0xff;
00113       header[2] = (len >> 2) & 0xff;
00114       header[3] = (len >> 3) & 0xff;
00115 
00116       // Writes the header
00117       ret = boost::asio::write(socket, boost::asio::buffer(header, header.size()));
00118       assert(ret == 4);
00119 
00120       // Writes the message
00121       ret = boost::asio::write(socket, boost::asio::buffer(message, message.size()));
00122       assert(ret == message.size());
00123     }
00124   }
00125 
00126   tcp::socket socket;
00127   bool keep_running;
00128   boost::thread reader_thread, writer_thread;
00129 
00130   megatree::BlockingQueue<Message> incoming_queue, outgoing_queue;
00131 };
00132 
00133 int main(int argc, char** argv)
00134 {
00135   printf("Hello world\n");
00136 
00137   try {
00138     boost::asio::io_service io_service;
00139     tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 22322));
00140     for (;;) {
00141       ClientHandler handler(io_service);
00142       //tcp::socket socket(io_service);
00143       acceptor.accept(handler.socket);
00144       printf("Accepted!\n");
00145       handler.start();
00146       handler.waitUntilDone();
00147     }
00148   }
00149   catch (std::exception &e)
00150   {
00151     fprintf(stderr, "Exception: %s\n", e.what());
00152   }
00153 }
00154 


megatree_storage
Author(s): Wim Meeussen
autogenerated on Thu Nov 28 2013 11:30:26