00001 #include "Communication.h" 00002 00003 namespace castor { namespace net { 00004 00005 boost::mutex Communication::mutex; 00006 CommunicationPtr Communication::instance; 00007 00008 Communication::Communication() : 00009 threadPool(), service(), initialised(false) 00010 { 00011 } 00012 00013 Communication::~Communication() { 00014 } 00015 00016 CommunicationPtr Communication::getInstance() { 00017 00018 boost::mutex::scoped_lock lock(mutex); 00019 00020 if (!instance) { 00021 instance = CommunicationPtr(new Communication()); 00022 } 00023 00024 return instance; 00025 } 00026 00027 CastorChannelPtr Communication::create(const std::string spec) throw(NetException) { 00028 return CastorChannel::create(this->service, spec); 00029 } 00030 00031 CastorChannelPtr Communication::create(const NetAddress &address) throw(NetException) { 00032 return CastorChannel::create(this->service, address); 00033 } 00034 00035 void Communication::run() { 00036 std::cout << "geminga Communication run " << std::endl; 00037 this->service.run(); 00038 std::cout << "geminga Communication stopped " << std::endl; 00039 } 00040 00041 void Communication::start(size_t threads) { 00042 00043 boost::mutex::scoped_lock lock(mutex); 00044 00045 if (this->initialised) return; 00046 00047 this->initialised = true; 00048 00049 for (size_t i = 0; i < threads; i++) { 00050 this->threadPool.push_back(ThreadPtr(new asio::thread( 00051 boost::bind(&Communication::run, this)))); 00052 } 00053 00054 initialised = true; 00055 std::cout << "Castor communication subsystem started (" << threads << " threads)" << std::endl; 00056 } 00057 00058 void Communication::stop() { 00059 00060 boost::mutex::scoped_lock lock(mutex); 00061 00062 this->service.stop(); 00063 00064 for (ThreadPool::iterator itr = this->threadPool.begin(); 00065 itr != this->threadPool.end(); itr++) 00066 { 00067 (*itr)->join(); 00068 } 00069 00070 this->threadPool.clear(); 00071 00072 // std::cout << "Castor communication subsystem stopped" << std::endl; 00073 } 00074 00075 void Communication::wait() { 00076 std::cout << "Start join" << std::endl; 00077 this->threadPool[0]->join(); 00078 std::cout << "Stop join" << std::endl; 00079 } 00080 00081 asio::io_service &Communication::asioService() { 00082 return this->service; 00083 } 00084 00085 } } 00086