Go to the documentation of this file.00001 #include <cstdio>
00002 #include <deque>
00003
00004 #include <boost/thread.hpp>
00005 #include <boost/thread/mutex.hpp>
00006 #include <boost/thread/condition.hpp>
00007
00008 #include <boost/asio.hpp>
00009
00010 #include <megatree/blocking_queue.h>
00011
00012
00013
00014
00015
00016
00017 using boost::asio::ip::tcp;
00018
00019 typedef std::vector<uint8_t> Message;
00020
00021 class ClientHandler
00022 {
00023 public:
00024 ClientHandler(boost::asio::io_service &io_service) : socket(io_service)
00025 {
00026 }
00027
00028 ~ClientHandler()
00029 {
00030 stop();
00031 reader_thread.join();
00032 writer_thread.join();
00033 }
00034
00035 void start()
00036 {
00037 keep_running = true;
00038 reader_thread = boost::thread(boost::bind(&ClientHandler::readerThread, this));
00039 writer_thread = boost::thread(boost::bind(&ClientHandler::writerThread, this));
00040 }
00041
00042 void stop()
00043 {
00044 keep_running = false;
00045 reader_thread.interrupt();
00046 writer_thread.interrupt();
00047 }
00048
00049 void waitUntilDone()
00050 {
00051 while (keep_running)
00052 usleep(100);
00053 reader_thread.join();
00054 writer_thread.join();
00055 }
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067 void readerThread()
00068 {
00069 while (keep_running)
00070 {
00071
00072 std::vector<uint8_t> header(4);
00073 boost::system::error_code error;
00074 size_t ret;
00075 ret = boost::asio::read(socket, boost::asio::buffer(header, 4),
00076 boost::asio::transfer_all(), error);
00077 if (error) {
00078 fprintf(stderr, "Error in reader thread: %s\n", error.message().c_str());
00079 stop();
00080 }
00081 assert(ret == 4);
00082
00083
00084 uint32_t message_len = (header[0]) + (header[1] << 1) + (header[2] << 2) + (header[3] << 3);
00085 Message message(message_len - 4);
00086
00087
00088 ret = boost::asio::read(socket, boost::asio::buffer(message, message.size()),
00089 boost::asio::transfer_all(), error);
00090 if (error) {
00091 fprintf(stderr, "Error in reader thread: %s\n", error.message().c_str());
00092 stop();
00093 }
00094 assert(ret == message.size());
00095
00096
00097 size_t current_size = incoming_queue.enqueue(message);
00098 printf("Received. Incoming queue: %zu\n", current_size);
00099 }
00100 }
00101
00102 void writerThread()
00103 {
00104 while (keep_running)
00105 {
00106 size_t ret;
00107 Message message = outgoing_queue.dequeue();
00108
00109 uint32_t len = message.size() + 4;
00110 std::vector<uint8_t> header(4);
00111 header[0] = len & 0xff;
00112 header[1] = (len >> 1) & 0xff;
00113 header[2] = (len >> 2) & 0xff;
00114 header[3] = (len >> 3) & 0xff;
00115
00116
00117 ret = boost::asio::write(socket, boost::asio::buffer(header, header.size()));
00118 assert(ret == 4);
00119
00120
00121 ret = boost::asio::write(socket, boost::asio::buffer(message, message.size()));
00122 assert(ret == message.size());
00123 }
00124 }
00125
00126 tcp::socket socket;
00127 bool keep_running;
00128 boost::thread reader_thread, writer_thread;
00129
00130 megatree::BlockingQueue<Message> incoming_queue, outgoing_queue;
00131 };
00132
00133 int main(int argc, char** argv)
00134 {
00135 printf("Hello world\n");
00136
00137 try {
00138 boost::asio::io_service io_service;
00139 tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 22322));
00140 for (;;) {
00141 ClientHandler handler(io_service);
00142
00143 acceptor.accept(handler.socket);
00144 printf("Accepted!\n");
00145 handler.start();
00146 handler.waitUntilDone();
00147 }
00148 }
00149 catch (std::exception &e)
00150 {
00151 fprintf(stderr, "Exception: %s\n", e.what());
00152 }
00153 }
00154